Rework the jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include <systemd/sd-event.h>
30
31 #include "jobs.h"
32 #include "sig-monitor.h"
33 #include "verbose.h"
34
35 #if 0
36 #define _alert_ "do you really want to remove monitoring?"
37 #define sig_monitor_init_timeouts()  ((void)0)
38 #define sig_monitor_clean_timeouts() ((void)0)
39 #define sig_monitor(to,cb,arg)       (cb(0,arg))
40 #endif
41
42 /** Internal shortcut for callback */
43 typedef void (*job_cb_t)(int, void*, void *, void*);
44
45 /** Description of a pending job */
46 struct job
47 {
48         struct job *next;    /**< link to the next job enqueued */
49         void *group;         /**< group of the request */
50         job_cb_t callback;   /**< processing callback */
51         void *arg1;          /**< first arg */
52         void *arg2;          /**< second arg */
53         void *arg3;          /**< third arg */
54         int timeout;         /**< timeout in second for processing the request */
55         unsigned blocked: 1; /**< is an other request blocking this one ? */
56         unsigned dropped: 1; /**< is removed ? */
57 };
58
59 /** Description of handled event loops */
60 struct events
61 {
62         struct events *next;
63         struct sd_event *event;
64         unsigned runs: 1;
65 };
66
67 /** Description of threads */
68 struct thread
69 {
70         struct thread *next;   /**< next thread of the list */
71         struct thread *upper;  /**< upper same thread */
72         struct job *job;       /**< currently processed job */
73         struct events *events; /**< currently processed job */
74         pthread_t tid;         /**< the thread id */
75         unsigned stop: 1;      /**< stop requested */
76         unsigned lowered: 1;   /**< has a lower same thread */
77         unsigned waits: 1;     /**< is waiting? */
78 };
79
80 /* synchronisation of threads */
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
83
84 /* count allowed, started and waiting threads */
85 static int allowed = 0; /** allowed count of threads */
86 static int started = 0; /** started count of threads */
87 static int waiting = 0; /** waiting count of threads */
88 static int remains = 0; /** allowed count of waiting jobs */
89 static int nevents = 0; /** count of events */
90
91 /* list of threads */
92 static struct thread *threads;
93 static _Thread_local struct thread *current;
94
95 /* queue of pending jobs */
96 static struct job *first_job;
97 static struct events *first_events;
98 static struct job *free_jobs;
99
100 /**
101  * Create a new job with the given parameters
102  * @param group    the group of the job
103  * @param timeout  the timeout of the job (0 if none)
104  * @param callback the function that achieves the job
105  * @param arg1     the first argument of the callback
106  * @param arg2     the second argument of the callback
107  * @param arg3     the third argument of the callback
108  * @return the created job unblock or NULL when no more memory
109  */
110 static struct job *job_create(
111                 void *group,
112                 int timeout,
113                 job_cb_t callback,
114                 void *arg1,
115                 void *arg2,
116                 void *arg3)
117 {
118         struct job *job;
119
120         /* try recyle existing job */
121         job = free_jobs;
122         if (job)
123                 free_jobs = job->next;
124         else {
125                 /* allocation  without blocking */
126                 pthread_mutex_unlock(&mutex);
127                 job = malloc(sizeof *job);
128                 pthread_mutex_lock(&mutex);
129                 if (!job) {
130                         errno = -ENOMEM;
131                         goto end;
132                 }
133         }
134         /* initialises the job */
135         job->group = group;
136         job->timeout = timeout;
137         job->callback = callback;
138         job->arg1 = arg1;
139         job->arg2 = arg2;
140         job->arg3 = arg3;
141         job->blocked = 0;
142         job->dropped = 0;
143 end:
144         return job;
145 }
146
147 /**
148  * Adds 'job' at the end of the list of jobs, marking it
149  * as blocked if an other job with the same group is pending.
150  * @param job the job to add
151  */
152 static void job_add(struct job *job)
153 {
154         void *group;
155         struct job *ijob, **pjob;
156
157         /* prepare to add */
158         group = job->group;
159         job->next = NULL;
160
161         /* search end and blockers */
162         pjob = &first_job;
163         ijob = first_job;
164         while (ijob) {
165                 if (group && ijob->group == group)
166                         job->blocked = 1;
167                 pjob = &ijob->next;
168                 ijob = ijob->next;
169         }
170
171         /* queue the jobs */
172         *pjob = job;
173 }
174
175 /**
176  * Get the next job to process or NULL if none.
177  * @return the first job that isn't blocked or NULL
178  */
179 static inline struct job *job_get()
180 {
181         struct job *job = first_job;
182         while (job && job->blocked)
183                 job = job->next;
184         return job;
185 }
186
187 /**
188  * Get the next events to process or NULL if none.
189  * @return the first events that isn't running or NULL
190  */
191 static inline struct events *events_get()
192 {
193         struct events *events = first_events;
194         while (events && events->runs)
195                 events = events->next;
196         return events;
197 }
198
199 /**
200  * Releases the processed 'job': removes it
201  * from the list of jobs and unblock the first
202  * pending job of the same group if any.
203  * @param job the job to release
204  */
205 static inline void job_release(struct job *job)
206 {
207         struct job *ijob, **pjob;
208         void *group;
209
210         /* first unqueue the job */
211         pjob = &first_job;
212         ijob = first_job;
213         while (ijob != job) {
214                 pjob = &ijob->next;
215                 ijob = ijob->next;
216         }
217         *pjob = job->next;
218
219         /* then unblock jobs of the same group */
220         group = job->group;
221         if (group) {
222                 ijob = job->next;
223                 while (ijob && ijob->group != group)
224                         ijob = ijob->next;
225                 if (ijob)
226                         ijob->blocked = 0;
227         }
228
229         /* recycle the job */
230         job->next = free_jobs;
231         free_jobs = job;
232 }
233
234 /**
235  * Monitored normal callback for a job.
236  * This function is called by the monitor
237  * to run the job when the safe environment
238  * is set.
239  * @param signum 0 on normal flow or the number
240  *               of the signal that interrupted the normal
241  *               flow
242  * @param arg     the job to run
243  */
244 static void job_call(int signum, void *arg)
245 {
246         struct job *job = arg;
247         job->callback(signum, job->arg1, job->arg2, job->arg3);
248 }
249
250 /**
251  * Monitored cancel callback for a job.
252  * This function is called by the monitor
253  * to cancel the job when the safe environment
254  * is set.
255  * @param signum 0 on normal flow or the number
256  *               of the signal that interrupted the normal
257  *               flow, isn't used
258  * @param arg    the job to run
259  */
260 static void job_cancel(int signum, void *arg)
261 {
262         job_call(SIGABRT, arg);
263 }
264
265 /**
266  * Monitored normal callback for events.
267  * This function is called by the monitor
268  * to run the event loop when the safe environment
269  * is set.
270  * @param signum 0 on normal flow or the number
271  *               of the signal that interrupted the normal
272  *               flow
273  * @param arg     the events to run
274  */
275 static void events_call(int signum, void *arg)
276 {
277         struct events *events = arg;
278         if (!signum)
279                 sd_event_run(events->event, (uint64_t) -1);
280 }
281
282 /**
283  * Main processing loop of threads processing jobs.
284  * The loop must be called with the mutex locked
285  * and it returns with the mutex locked.
286  * @param me the description of the thread to use
287  * TODO: how are timeout handled when reentering?
288  */
289 static void thread_run(volatile struct thread *me)
290 {
291         struct thread **prv;
292         struct job *job;
293         struct events *events;
294
295         /* initialize description of itself and link it in the list */
296         me->tid = pthread_self();
297         me->stop = 0;
298         me->lowered = 0;
299         me->waits = 0;
300         me->upper = current;
301         if (current)
302                 current->lowered = 1;
303         else
304                 sig_monitor_init_timeouts();
305         current = (struct thread*)me;
306         me->next = threads;
307         threads = (struct thread*)me;
308         started++;
309
310         NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
311
312         /* loop until stopped */
313         me->events = NULL;
314         while (!me->stop) {
315                 /* get a job */
316                 job = job_get(first_job);
317                 if (job) {
318                         /* prepare running the job */
319                         remains++; /* increases count of job that can wait */
320                         job->blocked = 1; /* mark job as blocked */
321                         me->job = job; /* record the job (only for terminate) */
322
323                         /* run the job */
324                         pthread_mutex_unlock(&mutex);
325                         sig_monitor(job->timeout, job_call, job);
326                         pthread_mutex_lock(&mutex);
327
328                         /* release the run job */
329                         job_release(job);
330
331                         /* release event if any */
332                         events = me->events;
333                         if (events) {
334                                 events->runs = 0;
335                                 me->events = NULL;
336                         }
337                 } else {
338                         /* no job, check events */
339                         events = events_get();
340                         if (events) {
341                                 /* run the events */
342                                 events->runs = 1;
343                                 me->events = events;
344                                 pthread_mutex_unlock(&mutex);
345                                 sig_monitor(0, events_call, events);
346                                 pthread_mutex_lock(&mutex);
347                                 events->runs = 0;
348                                 me->events = NULL;
349                         } else {
350                                 /* no job and not events */
351                                 waiting++;
352                                 me->waits = 1;
353                                 pthread_cond_wait(&cond, &mutex);
354                                 me->waits = 0;
355                                 waiting--;
356                         }
357                 }
358         }
359         NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
360
361         /* unlink the current thread and cleanup */
362         started--;
363         prv = &threads;
364         while (*prv != me)
365                 prv = &(*prv)->next;
366         *prv = me->next;
367         current = me->upper;
368         if (current)
369                 current->lowered = 0;
370         else
371                 sig_monitor_clean_timeouts();
372 }
373
374 /**
375  * Entry point for created threads.
376  * @param data not used
377  * @return NULL
378  */
379 static void *thread_main(void *data)
380 {
381         struct thread me;
382
383         pthread_mutex_lock(&mutex);
384         thread_run(&me);
385         pthread_mutex_unlock(&mutex);
386         return NULL;
387 }
388
389 /**
390  * Starts a new thread
391  * @return 0 in case of success or -1 in case of error
392  */
393 static int start_one_thread()
394 {
395         pthread_t tid;
396         int rc;
397
398         rc = pthread_create(&tid, NULL, thread_main, NULL);
399         if (rc != 0) {
400                 /* errno = rc; */
401                 WARNING("not able to start thread: %m");
402                 rc = -1;
403         }
404         return rc;
405 }
406
407 /**
408  * Queues a new asynchronous job represented by 'callback'
409  * for the 'group' and the 'timeout'.
410  * Jobs are queued FIFO and are possibly executed in parallel
411  * concurrently except for job of the same group that are
412  * executed sequentially in FIFO order.
413  * @param group    The group of the job or NULL when no group.
414  * @param timeout  The maximum execution time in seconds of the job
415  *                 or 0 for unlimited time.
416  * @param callback The function to execute for achieving the job.
417  *                 Its first parameter is either 0 on normal flow
418  *                 or the signal number that broke the normal flow.
419  * @return 0 in case of success or -1 in case of error
420  */
421 int jobs_queue0(
422                 void *group,
423                 int timeout,
424                 void (*callback)(int signum))
425 {
426         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
427 }
428
429 /**
430  * Queues a new asynchronous job represented by 'callback' and 'arg1'
431  * for the 'group' and the 'timeout'.
432  * Jobs are queued FIFO and are possibly executed in parallel
433  * concurrently except for job of the same group that are
434  * executed sequentially in FIFO order.
435  * @param group    The group of the job or NULL when no group.
436  * @param timeout  The maximum execution time in seconds of the job
437  *                 or 0 for unlimited time.
438  * @param callback The function to execute for achieving the job.
439  *                 Its first parameter is either 0 on normal flow
440  *                 or the signal number that broke the normal flow.
441  *                 The remaining parameter is the parameter 'arg1'
442  *                 given here.
443  * @param arg1     The second argument for 'callback'
444  * @return 0 in case of success or -1 in case of error
445  */
446 int jobs_queue(
447                 void *group,
448                 int timeout,
449                 void (*callback)(int, void*),
450                 void *arg)
451 {
452         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
453 }
454
455 /**
456  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
457  * for the 'group' and the 'timeout'.
458  * Jobs are queued FIFO and are possibly executed in parallel
459  * concurrently except for job of the same group that are
460  * executed sequentially in FIFO order.
461  * @param group    The group of the job or NULL when no group.
462  * @param timeout  The maximum execution time in seconds of the job
463  *                 or 0 for unlimited time.
464  * @param callback The function to execute for achieving the job.
465  *                 Its first parameter is either 0 on normal flow
466  *                 or the signal number that broke the normal flow.
467  *                 The remaining parameters are the parameters 'arg[12]'
468  *                 given here.
469  * @param arg1     The second argument for 'callback'
470  * @param arg2     The third argument for 'callback'
471  * @return 0 in case of success or -1 in case of error
472  */
473 int jobs_queue2(
474                 void *group,
475                 int timeout,
476                 void (*callback)(int, void*, void*),
477                 void *arg1,
478                 void *arg2)
479 {
480         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
481 }
482
483 /**
484  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
485  * for the 'group' and the 'timeout'.
486  * Jobs are queued FIFO and are possibly executed in parallel
487  * concurrently except for job of the same group that are
488  * executed sequentially in FIFO order.
489  * @param group    The group of the job or NULL when no group.
490  * @param timeout  The maximum execution time in seconds of the job
491  *                 or 0 for unlimited time.
492  * @param callback The function to execute for achieving the job.
493  *                 Its first parameter is either 0 on normal flow
494  *                 or the signal number that broke the normal flow.
495  *                 The remaining parameters are the parameters 'arg[123]'
496  *                 given here.
497  * @param arg1     The second argument for 'callback'
498  * @param arg2     The third argument for 'callback'
499  * @param arg3     The forth argument for 'callback'
500  * @return 0 in case of success or -1 in case of error
501  */
502 int jobs_queue3(
503                 void *group,
504                 int timeout,
505                 void (*callback)(int, void*, void *, void*),
506                 void *arg1,
507                 void *arg2,
508                 void *arg3)
509 {
510         const char *info;
511         struct job *job;
512         int rc;
513
514         pthread_mutex_lock(&mutex);
515
516         /* allocates the job */
517         job = job_create(group, timeout, callback, arg1, arg2, arg3);
518         if (!job) {
519                 errno = ENOMEM;
520                 info = "out of memory";
521                 goto error;
522         }
523
524         /* check availability */
525         if (remains == 0) {
526                 errno = EBUSY;
527                 info = "too many jobs";
528                 goto error2;
529         }
530
531         /* start a thread if needed */
532         if (waiting == 0 && started < allowed) {
533                 /* all threads are busy and a new can be started */
534                 rc = start_one_thread();
535                 if (rc < 0 && started == 0) {
536                         info = "can't start first thread";
537                         goto error2;
538                 }
539         }
540
541         /* queues the job */
542         remains--;
543         job_add(job);
544
545         /* signal an existing job */
546         pthread_cond_signal(&cond);
547         pthread_mutex_unlock(&mutex);
548         return 0;
549
550 error2:
551         job->next = free_jobs;
552         free_jobs = job;
553 error:
554         ERROR("can't process job with threads: %s, %m", info);
555         pthread_mutex_unlock(&mutex);
556         return -1;
557 }
558
559 /**
560  * Enter a synchronisation point: activates the job given by 'callback'
561  * @param group the gro
562  */
563 int jobs_enter(
564                 void *group,
565                 int timeout,
566                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
567                 void *closure)
568 {
569         
570         struct job *job;
571         struct thread me;
572
573         pthread_mutex_lock(&mutex);
574
575         /* allocates the job */
576         job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
577         if (!job) {
578                 ERROR("out of memory");
579                 errno = ENOMEM;
580                 pthread_mutex_unlock(&mutex);
581                 return -1;
582         }
583
584         /* queues the job */
585         job_add(job);
586
587         /* run until stopped */
588         thread_run(&me);
589         pthread_mutex_unlock(&mutex);
590         return 0;
591 }
592
593 int jobs_leave(struct jobloop *jobloop)
594 {
595         struct thread *t;
596         pthread_mutex_lock(&mutex);
597
598         t = threads;
599         while (t && t != (struct thread*)jobloop)
600                 t = t->next;
601         if (!t) {
602                 errno = EINVAL;
603         } else {
604                 t->stop = 1;
605                 if (t->waits)
606                         pthread_cond_broadcast(&cond);
607         }
608         pthread_mutex_unlock(&mutex);
609         return -!t;
610 }
611
612 /**
613  * Gets a sd_event item for the current thread.
614  * @return a sd_event or NULL in case of error
615  */
616 struct sd_event *jobs_get_sd_event()
617 {
618         struct events *events;
619         struct thread *me;
620         int rc;
621
622         pthread_mutex_lock(&mutex);
623
624         /* search events on stack */
625         me = current;
626         while (me && !me->events)
627                 me = me->upper;
628         if (me)
629                 /* return the stacked events */
630                 events = me->events;
631         else {
632                 /* search an available events */
633                 events = events_get();
634                 if (!events) {
635                         /* not found, check if creation possible */
636                         if (nevents >= allowed) {
637                                 ERROR("not possible to add a new event");
638                                 events = NULL;
639                         } else {
640                                 events = malloc(sizeof *events);
641                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
642                                         if (nevents < started || start_one_thread() >= 0) {
643                                                 events->runs = 0;
644                                                 events->next = first_events;
645                                                 first_events = events;
646                                         } else {
647                                                 ERROR("can't start thread for events");
648                                                 sd_event_unref(events->event);
649                                                 free(events);
650                                                 events = NULL;
651                                         }
652                                 } else {
653                                         if (!events) {
654                                                 ERROR("out of memory");
655                                                 errno = ENOMEM;
656                                         } else {
657                                                 free(events);
658                                                 ERROR("creation of sd_event failed: %m");
659                                                 events = NULL;
660                                                 errno = -rc;
661                                         } 
662                                 }
663                         }
664                 }
665                 if (events) {
666                         /* */
667                         me = current;
668                         if (me) {
669                                 events->runs = 1;
670                                 me->events = events;
671                         } else {
672                                 WARNING("event returned for unknown thread!");
673                         }
674                 }
675         }
676         pthread_mutex_unlock(&mutex);
677         return events ? events->event : NULL;
678 }
679
680 /**
681  * Enter the jobs processing loop.
682  * @param allowed_count Maximum count of thread for jobs including this one
683  * @param start_count   Count of thread to start now, must be lower.
684  * @param waiter_count  Maximum count of jobs that can be waiting.
685  * @param start         The start routine to activate (can't be NULL)
686  * @return 0 in case of success or -1 in case of error.
687  */
688 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
689 {
690         int rc, launched;
691         struct thread me;
692         struct job *job;
693
694         assert(allowed_count >= 1);
695         assert(start_count >= 0);
696         assert(waiter_count > 0);
697         assert(start_count <= allowed_count);
698
699         rc = -1;
700         pthread_mutex_lock(&mutex);
701
702         /* check whether already running */
703         if (current || allowed) {
704                 ERROR("thread already started");
705                 errno = EINVAL;
706                 goto error;
707         }
708
709         /* start */
710         if (sig_monitor_init() < 0) {
711                 ERROR("failed to initialise signal handlers");
712                 goto error;
713         }
714
715         /* records the allowed count */
716         allowed = allowed_count;
717         started = 0;
718         waiting = 0;
719         remains = waiter_count;
720
721         /* start at least one thread */
722         launched = 0;
723         while ((launched + 1) < start_count) {
724                 if (start_one_thread() != 0) {
725                         ERROR("Not all threads can be started");
726                         goto error;
727                 }
728                 launched++;
729         }
730
731         /* queue the start job */
732         job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
733         if (!job) {
734                 ERROR("out of memory");
735                 errno = ENOMEM;
736                 goto error;
737         }
738         job_add(job);
739         remains--;
740
741         /* run until end */
742         thread_run(&me);
743         rc = 0;
744 error:
745         pthread_mutex_unlock(&mutex);
746         return rc;
747 }
748
749 /**
750  * Terminate all the threads and cancel all pending jobs.
751  */
752 void jobs_terminate()
753 {
754         struct job *job, *head, *tail;
755         pthread_t me, *others;
756         struct thread *t;
757         int count;
758
759         /* how am i? */
760         me = pthread_self();
761
762         /* request all threads to stop */
763         pthread_mutex_lock(&mutex);
764         allowed = 0;
765
766         /* count the number of threads */
767         count = 0;
768         t = threads;
769         while (t) {
770                 if (!t->upper && !pthread_equal(t->tid, me))
771                         count++;
772                 t = t->next;
773         }
774
775         /* fill the array of threads */
776         others = alloca(count * sizeof *others);
777         count = 0;
778         t = threads;
779         while (t) {
780                 if (!t->upper && !pthread_equal(t->tid, me))
781                         others[count++] = t->tid;
782                 t = t->next;
783         }
784
785         /* stops the threads */
786         t = threads;
787         while (t) {
788                 t->stop = 1;
789                 t = t->next;
790         }
791
792         /* wait the threads */
793         pthread_cond_broadcast(&cond);
794         pthread_mutex_unlock(&mutex);
795         while (count)
796                 pthread_join(others[--count], NULL);
797         pthread_mutex_lock(&mutex);
798
799         /* cancel pending jobs of other threads */
800         remains = 0;
801         head = first_job;
802         first_job = NULL;
803         tail = NULL;
804         while (head) {
805                 /* unlink the job */
806                 job = head;
807                 head = job->next;
808
809                 /* search if job is stacked for current */
810                 t = current;
811                 while (t && t->job != job)
812                         t = t->upper;
813                 if (t) {
814                         /* yes, relink it at end */
815                         if (tail)
816                                 tail->next = job;
817                         else
818                                 first_job = job;
819                         tail = job;
820                         job->next = NULL;
821                 } else {
822                         /* no cancel the job */
823                         pthread_mutex_unlock(&mutex);
824                         sig_monitor(0, job_cancel, job);
825                         free(job);
826                         pthread_mutex_lock(&mutex);
827                 }
828         }
829         pthread_mutex_unlock(&mutex);
830 }
831