Avoid to count child threads
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include <systemd/sd-event.h>
30
31 #include "jobs.h"
32 #include "sig-monitor.h"
33 #include "verbose.h"
34
35 #if 0
36 #define _alert_ "do you really want to remove monitoring?"
37 #define sig_monitor_init_timeouts()  ((void)0)
38 #define sig_monitor_clean_timeouts() ((void)0)
39 #define sig_monitor(to,cb,arg)       (cb(0,arg))
40 #endif
41
42 /** Internal shortcut for callback */
43 typedef void (*job_cb_t)(int, void*, void *, void*);
44
45 /** Description of a pending job */
46 struct job
47 {
48         struct job *next;    /**< link to the next job enqueued */
49         void *group;         /**< group of the request */
50         job_cb_t callback;   /**< processing callback */
51         void *arg1;          /**< first arg */
52         void *arg2;          /**< second arg */
53         void *arg3;          /**< third arg */
54         int timeout;         /**< timeout in second for processing the request */
55         unsigned blocked: 1; /**< is an other request blocking this one ? */
56         unsigned dropped: 1; /**< is removed ? */
57 };
58
59 /** Description of handled event loops */
60 struct events
61 {
62         struct events *next;
63         struct sd_event *event;
64         unsigned runs: 1;
65 };
66
67 /** Description of threads */
68 struct thread
69 {
70         struct thread *next;   /**< next thread of the list */
71         struct thread *upper;  /**< upper same thread */
72         struct job *job;       /**< currently processed job */
73         struct events *events; /**< currently processed job */
74         pthread_t tid;         /**< the thread id */
75         unsigned stop: 1;      /**< stop requested */
76         unsigned lowered: 1;   /**< has a lower same thread */
77         unsigned waits: 1;     /**< is waiting? */
78 };
79
80 /* synchronisation of threads */
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
83
84 /* count allowed, started and waiting threads */
85 static int allowed = 0; /** allowed count of threads */
86 static int started = 0; /** started count of threads */
87 static int waiting = 0; /** waiting count of threads */
88 static int remains = 0; /** allowed count of waiting jobs */
89 static int nevents = 0; /** count of events */
90
91 /* list of threads */
92 static struct thread *threads;
93 static _Thread_local struct thread *current;
94
95 /* queue of pending jobs */
96 static struct job *first_job;
97 static struct events *first_events;
98 static struct job *free_jobs;
99
100 /**
101  * Create a new job with the given parameters
102  * @param group    the group of the job
103  * @param timeout  the timeout of the job (0 if none)
104  * @param callback the function that achieves the job
105  * @param arg1     the first argument of the callback
106  * @param arg2     the second argument of the callback
107  * @param arg3     the third argument of the callback
108  * @return the created job unblock or NULL when no more memory
109  */
110 static struct job *job_create(
111                 void *group,
112                 int timeout,
113                 job_cb_t callback,
114                 void *arg1,
115                 void *arg2,
116                 void *arg3)
117 {
118         struct job *job;
119
120         /* try recyle existing job */
121         job = free_jobs;
122         if (job)
123                 free_jobs = job->next;
124         else {
125                 /* allocation  without blocking */
126                 pthread_mutex_unlock(&mutex);
127                 job = malloc(sizeof *job);
128                 pthread_mutex_lock(&mutex);
129                 if (!job) {
130                         errno = -ENOMEM;
131                         goto end;
132                 }
133         }
134         /* initialises the job */
135         job->group = group;
136         job->timeout = timeout;
137         job->callback = callback;
138         job->arg1 = arg1;
139         job->arg2 = arg2;
140         job->arg3 = arg3;
141         job->blocked = 0;
142         job->dropped = 0;
143 end:
144         return job;
145 }
146
147 /**
148  * Adds 'job' at the end of the list of jobs, marking it
149  * as blocked if an other job with the same group is pending.
150  * @param job the job to add
151  */
152 static void job_add(struct job *job)
153 {
154         void *group;
155         struct job *ijob, **pjob;
156
157         /* prepare to add */
158         group = job->group;
159         job->next = NULL;
160
161         /* search end and blockers */
162         pjob = &first_job;
163         ijob = first_job;
164         while (ijob) {
165                 if (group && ijob->group == group)
166                         job->blocked = 1;
167                 pjob = &ijob->next;
168                 ijob = ijob->next;
169         }
170
171         /* queue the jobs */
172         *pjob = job;
173 }
174
175 /**
176  * Get the next job to process or NULL if none.
177  * @return the first job that isn't blocked or NULL
178  */
179 static inline struct job *job_get()
180 {
181         struct job *job = first_job;
182         while (job && job->blocked)
183                 job = job->next;
184         return job;
185 }
186
187 /**
188  * Get the next events to process or NULL if none.
189  * @return the first events that isn't running or NULL
190  */
191 static inline struct events *events_get()
192 {
193         struct events *events = first_events;
194         while (events && events->runs)
195                 events = events->next;
196         return events;
197 }
198
199 /**
200  * Releases the processed 'job': removes it
201  * from the list of jobs and unblock the first
202  * pending job of the same group if any.
203  * @param job the job to release
204  */
205 static inline void job_release(struct job *job)
206 {
207         struct job *ijob, **pjob;
208         void *group;
209
210         /* first unqueue the job */
211         pjob = &first_job;
212         ijob = first_job;
213         while (ijob != job) {
214                 pjob = &ijob->next;
215                 ijob = ijob->next;
216         }
217         *pjob = job->next;
218
219         /* then unblock jobs of the same group */
220         group = job->group;
221         if (group) {
222                 ijob = job->next;
223                 while (ijob && ijob->group != group)
224                         ijob = ijob->next;
225                 if (ijob)
226                         ijob->blocked = 0;
227         }
228
229         /* recycle the job */
230         job->next = free_jobs;
231         free_jobs = job;
232 }
233
234 /**
235  * Monitored normal callback for a job.
236  * This function is called by the monitor
237  * to run the job when the safe environment
238  * is set.
239  * @param signum 0 on normal flow or the number
240  *               of the signal that interrupted the normal
241  *               flow
242  * @param arg     the job to run
243  */
244 static void job_call(int signum, void *arg)
245 {
246         struct job *job = arg;
247         job->callback(signum, job->arg1, job->arg2, job->arg3);
248 }
249
250 /**
251  * Monitored cancel callback for a job.
252  * This function is called by the monitor
253  * to cancel the job when the safe environment
254  * is set.
255  * @param signum 0 on normal flow or the number
256  *               of the signal that interrupted the normal
257  *               flow, isn't used
258  * @param arg    the job to run
259  */
260 static void job_cancel(int signum, void *arg)
261 {
262         job_call(SIGABRT, arg);
263 }
264
265 /**
266  * Monitored normal callback for events.
267  * This function is called by the monitor
268  * to run the event loop when the safe environment
269  * is set.
270  * @param signum 0 on normal flow or the number
271  *               of the signal that interrupted the normal
272  *               flow
273  * @param arg     the events to run
274  */
275 static void events_call(int signum, void *arg)
276 {
277         struct events *events = arg;
278         if (!signum)
279                 sd_event_run(events->event, (uint64_t) -1);
280 }
281
282 /**
283  * Main processing loop of threads processing jobs.
284  * The loop must be called with the mutex locked
285  * and it returns with the mutex locked.
286  * @param me the description of the thread to use
287  * TODO: how are timeout handled when reentering?
288  */
289 static void thread_run(volatile struct thread *me)
290 {
291         struct thread **prv;
292         struct job *job;
293         struct events *events;
294
295         /* initialize description of itself and link it in the list */
296         me->tid = pthread_self();
297         me->stop = 0;
298         me->lowered = 0;
299         me->waits = 0;
300         me->upper = current;
301         if (current) {
302                 current->lowered = 1;
303         } else {
304                 started++;
305                 sig_monitor_init_timeouts();
306         }
307         me->next = threads;
308         threads = (struct thread*)me;
309         current = (struct thread*)me;
310
311         NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
312
313         /* loop until stopped */
314         me->events = NULL;
315         while (!me->stop) {
316                 /* get a job */
317                 job = job_get(first_job);
318                 if (job) {
319                         /* prepare running the job */
320                         remains++; /* increases count of job that can wait */
321                         job->blocked = 1; /* mark job as blocked */
322                         me->job = job; /* record the job (only for terminate) */
323
324                         /* run the job */
325                         pthread_mutex_unlock(&mutex);
326                         sig_monitor(job->timeout, job_call, job);
327                         pthread_mutex_lock(&mutex);
328
329                         /* release the run job */
330                         job_release(job);
331
332                         /* release event if any */
333                         events = me->events;
334                         if (events) {
335                                 events->runs = 0;
336                                 me->events = NULL;
337                         }
338                 } else {
339                         /* no job, check events */
340                         events = events_get();
341                         if (events) {
342                                 /* run the events */
343                                 events->runs = 1;
344                                 me->events = events;
345                                 pthread_mutex_unlock(&mutex);
346                                 sig_monitor(0, events_call, events);
347                                 pthread_mutex_lock(&mutex);
348                                 events->runs = 0;
349                                 me->events = NULL;
350                         } else {
351                                 /* no job and not events */
352                                 waiting++;
353                                 me->waits = 1;
354                                 pthread_cond_wait(&cond, &mutex);
355                                 me->waits = 0;
356                                 waiting--;
357                         }
358                 }
359         }
360         NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
361
362         /* unlink the current thread and cleanup */
363         prv = &threads;
364         while (*prv != me)
365                 prv = &(*prv)->next;
366         *prv = me->next;
367         current = me->upper;
368         if (current) {
369                 current->lowered = 0;
370         } else {
371                 sig_monitor_clean_timeouts();
372                 started--;
373         }
374 }
375
376 /**
377  * Entry point for created threads.
378  * @param data not used
379  * @return NULL
380  */
381 static void *thread_main(void *data)
382 {
383         struct thread me;
384
385         pthread_mutex_lock(&mutex);
386         thread_run(&me);
387         pthread_mutex_unlock(&mutex);
388         return NULL;
389 }
390
391 /**
392  * Starts a new thread
393  * @return 0 in case of success or -1 in case of error
394  */
395 static int start_one_thread()
396 {
397         pthread_t tid;
398         int rc;
399
400         rc = pthread_create(&tid, NULL, thread_main, NULL);
401         if (rc != 0) {
402                 /* errno = rc; */
403                 WARNING("not able to start thread: %m");
404                 rc = -1;
405         }
406         return rc;
407 }
408
409 /**
410  * Queues a new asynchronous job represented by 'callback'
411  * for the 'group' and the 'timeout'.
412  * Jobs are queued FIFO and are possibly executed in parallel
413  * concurrently except for job of the same group that are
414  * executed sequentially in FIFO order.
415  * @param group    The group of the job or NULL when no group.
416  * @param timeout  The maximum execution time in seconds of the job
417  *                 or 0 for unlimited time.
418  * @param callback The function to execute for achieving the job.
419  *                 Its first parameter is either 0 on normal flow
420  *                 or the signal number that broke the normal flow.
421  * @return 0 in case of success or -1 in case of error
422  */
423 int jobs_queue0(
424                 void *group,
425                 int timeout,
426                 void (*callback)(int signum))
427 {
428         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
429 }
430
431 /**
432  * Queues a new asynchronous job represented by 'callback' and 'arg1'
433  * for the 'group' and the 'timeout'.
434  * Jobs are queued FIFO and are possibly executed in parallel
435  * concurrently except for job of the same group that are
436  * executed sequentially in FIFO order.
437  * @param group    The group of the job or NULL when no group.
438  * @param timeout  The maximum execution time in seconds of the job
439  *                 or 0 for unlimited time.
440  * @param callback The function to execute for achieving the job.
441  *                 Its first parameter is either 0 on normal flow
442  *                 or the signal number that broke the normal flow.
443  *                 The remaining parameter is the parameter 'arg1'
444  *                 given here.
445  * @param arg1     The second argument for 'callback'
446  * @return 0 in case of success or -1 in case of error
447  */
448 int jobs_queue(
449                 void *group,
450                 int timeout,
451                 void (*callback)(int, void*),
452                 void *arg)
453 {
454         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
455 }
456
457 /**
458  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
459  * for the 'group' and the 'timeout'.
460  * Jobs are queued FIFO and are possibly executed in parallel
461  * concurrently except for job of the same group that are
462  * executed sequentially in FIFO order.
463  * @param group    The group of the job or NULL when no group.
464  * @param timeout  The maximum execution time in seconds of the job
465  *                 or 0 for unlimited time.
466  * @param callback The function to execute for achieving the job.
467  *                 Its first parameter is either 0 on normal flow
468  *                 or the signal number that broke the normal flow.
469  *                 The remaining parameters are the parameters 'arg[12]'
470  *                 given here.
471  * @param arg1     The second argument for 'callback'
472  * @param arg2     The third argument for 'callback'
473  * @return 0 in case of success or -1 in case of error
474  */
475 int jobs_queue2(
476                 void *group,
477                 int timeout,
478                 void (*callback)(int, void*, void*),
479                 void *arg1,
480                 void *arg2)
481 {
482         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
483 }
484
485 /**
486  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
487  * for the 'group' and the 'timeout'.
488  * Jobs are queued FIFO and are possibly executed in parallel
489  * concurrently except for job of the same group that are
490  * executed sequentially in FIFO order.
491  * @param group    The group of the job or NULL when no group.
492  * @param timeout  The maximum execution time in seconds of the job
493  *                 or 0 for unlimited time.
494  * @param callback The function to execute for achieving the job.
495  *                 Its first parameter is either 0 on normal flow
496  *                 or the signal number that broke the normal flow.
497  *                 The remaining parameters are the parameters 'arg[123]'
498  *                 given here.
499  * @param arg1     The second argument for 'callback'
500  * @param arg2     The third argument for 'callback'
501  * @param arg3     The forth argument for 'callback'
502  * @return 0 in case of success or -1 in case of error
503  */
504 int jobs_queue3(
505                 void *group,
506                 int timeout,
507                 void (*callback)(int, void*, void *, void*),
508                 void *arg1,
509                 void *arg2,
510                 void *arg3)
511 {
512         const char *info;
513         struct job *job;
514         int rc;
515
516         pthread_mutex_lock(&mutex);
517
518         /* allocates the job */
519         job = job_create(group, timeout, callback, arg1, arg2, arg3);
520         if (!job) {
521                 errno = ENOMEM;
522                 info = "out of memory";
523                 goto error;
524         }
525
526         /* check availability */
527         if (remains == 0) {
528                 errno = EBUSY;
529                 info = "too many jobs";
530                 goto error2;
531         }
532
533         /* start a thread if needed */
534         if (waiting == 0 && started < allowed) {
535                 /* all threads are busy and a new can be started */
536                 rc = start_one_thread();
537                 if (rc < 0 && started == 0) {
538                         info = "can't start first thread";
539                         goto error2;
540                 }
541         }
542
543         /* queues the job */
544         remains--;
545         job_add(job);
546
547         /* signal an existing job */
548         pthread_cond_signal(&cond);
549         pthread_mutex_unlock(&mutex);
550         return 0;
551
552 error2:
553         job->next = free_jobs;
554         free_jobs = job;
555 error:
556         ERROR("can't process job with threads: %s, %m", info);
557         pthread_mutex_unlock(&mutex);
558         return -1;
559 }
560
561 /**
562  * Enter a synchronisation point: activates the job given by 'callback'
563  * @param group the gro
564  */
565 int jobs_enter(
566                 void *group,
567                 int timeout,
568                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
569                 void *closure)
570 {
571         
572         struct job *job;
573         struct thread me;
574
575         pthread_mutex_lock(&mutex);
576
577         /* allocates the job */
578         job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
579         if (!job) {
580                 ERROR("out of memory");
581                 errno = ENOMEM;
582                 pthread_mutex_unlock(&mutex);
583                 return -1;
584         }
585
586         /* queues the job */
587         job_add(job);
588
589         /* run until stopped */
590         thread_run(&me);
591         pthread_mutex_unlock(&mutex);
592         return 0;
593 }
594
595 int jobs_leave(struct jobloop *jobloop)
596 {
597         struct thread *t;
598         pthread_mutex_lock(&mutex);
599
600         t = threads;
601         while (t && t != (struct thread*)jobloop)
602                 t = t->next;
603         if (!t) {
604                 errno = EINVAL;
605         } else {
606                 t->stop = 1;
607                 if (t->waits)
608                         pthread_cond_broadcast(&cond);
609         }
610         pthread_mutex_unlock(&mutex);
611         return -!t;
612 }
613
614 /**
615  * Gets a sd_event item for the current thread.
616  * @return a sd_event or NULL in case of error
617  */
618 struct sd_event *jobs_get_sd_event()
619 {
620         struct events *events;
621         struct thread *me;
622         int rc;
623
624         pthread_mutex_lock(&mutex);
625
626         /* search events on stack */
627         me = current;
628         while (me && !me->events)
629                 me = me->upper;
630         if (me)
631                 /* return the stacked events */
632                 events = me->events;
633         else {
634                 /* search an available events */
635                 events = events_get();
636                 if (!events) {
637                         /* not found, check if creation possible */
638                         if (nevents >= allowed) {
639                                 ERROR("not possible to add a new event");
640                                 events = NULL;
641                         } else {
642                                 events = malloc(sizeof *events);
643                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
644                                         if (nevents < started || start_one_thread() >= 0) {
645                                                 events->runs = 0;
646                                                 events->next = first_events;
647                                                 first_events = events;
648                                         } else {
649                                                 ERROR("can't start thread for events");
650                                                 sd_event_unref(events->event);
651                                                 free(events);
652                                                 events = NULL;
653                                         }
654                                 } else {
655                                         if (!events) {
656                                                 ERROR("out of memory");
657                                                 errno = ENOMEM;
658                                         } else {
659                                                 free(events);
660                                                 ERROR("creation of sd_event failed: %m");
661                                                 events = NULL;
662                                                 errno = -rc;
663                                         } 
664                                 }
665                         }
666                 }
667                 if (events) {
668                         /* */
669                         me = current;
670                         if (me) {
671                                 events->runs = 1;
672                                 me->events = events;
673                         } else {
674                                 WARNING("event returned for unknown thread!");
675                         }
676                 }
677         }
678         pthread_mutex_unlock(&mutex);
679         return events ? events->event : NULL;
680 }
681
682 /**
683  * Enter the jobs processing loop.
684  * @param allowed_count Maximum count of thread for jobs including this one
685  * @param start_count   Count of thread to start now, must be lower.
686  * @param waiter_count  Maximum count of jobs that can be waiting.
687  * @param start         The start routine to activate (can't be NULL)
688  * @return 0 in case of success or -1 in case of error.
689  */
690 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
691 {
692         int rc, launched;
693         struct thread me;
694         struct job *job;
695
696         assert(allowed_count >= 1);
697         assert(start_count >= 0);
698         assert(waiter_count > 0);
699         assert(start_count <= allowed_count);
700
701         rc = -1;
702         pthread_mutex_lock(&mutex);
703
704         /* check whether already running */
705         if (current || allowed) {
706                 ERROR("thread already started");
707                 errno = EINVAL;
708                 goto error;
709         }
710
711         /* start */
712         if (sig_monitor_init() < 0) {
713                 ERROR("failed to initialise signal handlers");
714                 goto error;
715         }
716
717         /* records the allowed count */
718         allowed = allowed_count;
719         started = 0;
720         waiting = 0;
721         remains = waiter_count;
722
723         /* start at least one thread */
724         launched = 0;
725         while ((launched + 1) < start_count) {
726                 if (start_one_thread() != 0) {
727                         ERROR("Not all threads can be started");
728                         goto error;
729                 }
730                 launched++;
731         }
732
733         /* queue the start job */
734         job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
735         if (!job) {
736                 ERROR("out of memory");
737                 errno = ENOMEM;
738                 goto error;
739         }
740         job_add(job);
741         remains--;
742
743         /* run until end */
744         thread_run(&me);
745         rc = 0;
746 error:
747         pthread_mutex_unlock(&mutex);
748         return rc;
749 }
750
751 /**
752  * Terminate all the threads and cancel all pending jobs.
753  */
754 void jobs_terminate()
755 {
756         struct job *job, *head, *tail;
757         pthread_t me, *others;
758         struct thread *t;
759         int count;
760
761         /* how am i? */
762         me = pthread_self();
763
764         /* request all threads to stop */
765         pthread_mutex_lock(&mutex);
766         allowed = 0;
767
768         /* count the number of threads */
769         count = 0;
770         t = threads;
771         while (t) {
772                 if (!t->upper && !pthread_equal(t->tid, me))
773                         count++;
774                 t = t->next;
775         }
776
777         /* fill the array of threads */
778         others = alloca(count * sizeof *others);
779         count = 0;
780         t = threads;
781         while (t) {
782                 if (!t->upper && !pthread_equal(t->tid, me))
783                         others[count++] = t->tid;
784                 t = t->next;
785         }
786
787         /* stops the threads */
788         t = threads;
789         while (t) {
790                 t->stop = 1;
791                 t = t->next;
792         }
793
794         /* wait the threads */
795         pthread_cond_broadcast(&cond);
796         pthread_mutex_unlock(&mutex);
797         while (count)
798                 pthread_join(others[--count], NULL);
799         pthread_mutex_lock(&mutex);
800
801         /* cancel pending jobs of other threads */
802         remains = 0;
803         head = first_job;
804         first_job = NULL;
805         tail = NULL;
806         while (head) {
807                 /* unlink the job */
808                 job = head;
809                 head = job->next;
810
811                 /* search if job is stacked for current */
812                 t = current;
813                 while (t && t->job != job)
814                         t = t->upper;
815                 if (t) {
816                         /* yes, relink it at end */
817                         if (tail)
818                                 tail->next = job;
819                         else
820                                 first_job = job;
821                         tail = job;
822                         job->next = NULL;
823                 } else {
824                         /* no cancel the job */
825                         pthread_mutex_unlock(&mutex);
826                         sig_monitor(0, job_cancel, job);
827                         free(job);
828                         pthread_mutex_lock(&mutex);
829                 }
830         }
831         pthread_mutex_unlock(&mutex);
832 }
833