jobs: add const qualifier for groups
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         const void *group;   /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg;           /**< argument */
56         int timeout;         /**< timeout in second for processing the request */
57         unsigned blocked: 1; /**< is an other request blocking this one ? */
58         unsigned dropped: 1; /**< is removed ? */
59 };
60
61 /** Description of handled event loops */
62 struct events
63 {
64         struct events *next;
65         struct sd_event *event;
66         uint64_t timeout;
67         unsigned used: 1;
68         unsigned runs: 1;
69 };
70
71 /** Description of threads */
72 struct thread
73 {
74         struct thread *next;   /**< next thread of the list */
75         struct thread *upper;  /**< upper same thread */
76         struct job *job;       /**< currently processed job */
77         struct events *events; /**< currently processed job */
78         pthread_t tid;         /**< the thread id */
79         unsigned stop: 1;      /**< stop requested */
80         unsigned lowered: 1;   /**< has a lower same thread */
81         unsigned waits: 1;     /**< is waiting? */
82 };
83
84 /**
85  * Description of synchonous callback
86  */
87 struct sync
88 {
89         struct thread thread;   /**< thread loop data */
90         union {
91                 void (*callback)(int, void*);   /**< the synchronous callback */
92                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
93                                 /**< the entering synchronous routine */
94         };
95         void *arg;              /**< the argument of the callback */
96 };
97
98
99 /* synchronisation of threads */
100 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
102
103 /* count allowed, started and waiting threads */
104 static int allowed = 0; /** allowed count of threads */
105 static int started = 0; /** started count of threads */
106 static int waiting = 0; /** waiting count of threads */
107 static int remains = 0; /** allowed count of waiting jobs */
108 static int nevents = 0; /** count of events */
109
110 /* list of threads */
111 static struct thread *threads;
112 static _Thread_local struct thread *current;
113
114 /* queue of pending jobs */
115 static struct job *first_job;
116 static struct events *first_events;
117 static struct job *free_jobs;
118
119 /**
120  * Create a new job with the given parameters
121  * @param group    the group of the job
122  * @param timeout  the timeout of the job (0 if none)
123  * @param callback the function that achieves the job
124  * @param arg      the argument of the callback
125  * @return the created job unblock or NULL when no more memory
126  */
127 static struct job *job_create(
128                 const void *group,
129                 int timeout,
130                 job_cb_t callback,
131                 void *arg)
132 {
133         struct job *job;
134
135         /* try recyle existing job */
136         job = free_jobs;
137         if (job)
138                 free_jobs = job->next;
139         else {
140                 /* allocation  without blocking */
141                 pthread_mutex_unlock(&mutex);
142                 job = malloc(sizeof *job);
143                 pthread_mutex_lock(&mutex);
144                 if (!job) {
145                         errno = -ENOMEM;
146                         goto end;
147                 }
148         }
149         /* initialises the job */
150         job->group = group;
151         job->timeout = timeout;
152         job->callback = callback;
153         job->arg = arg;
154         job->blocked = 0;
155         job->dropped = 0;
156 end:
157         return job;
158 }
159
160 /**
161  * Adds 'job' at the end of the list of jobs, marking it
162  * as blocked if an other job with the same group is pending.
163  * @param job the job to add
164  */
165 static void job_add(struct job *job)
166 {
167         const void *group;
168         struct job *ijob, **pjob;
169
170         /* prepare to add */
171         group = job->group;
172         job->next = NULL;
173
174         /* search end and blockers */
175         pjob = &first_job;
176         ijob = first_job;
177         while (ijob) {
178                 if (group && ijob->group == group)
179                         job->blocked = 1;
180                 pjob = &ijob->next;
181                 ijob = ijob->next;
182         }
183
184         /* queue the jobs */
185         *pjob = job;
186 }
187
188 /**
189  * Get the next job to process or NULL if none.
190  * @return the first job that isn't blocked or NULL
191  */
192 static inline struct job *job_get()
193 {
194         struct job *job = first_job;
195         while (job && job->blocked)
196                 job = job->next;
197         return job;
198 }
199
200 /**
201  * Get the next events to process or NULL if none.
202  * @return the first events that isn't running or NULL
203  */
204 static inline struct events *events_get()
205 {
206         struct events *events = first_events;
207         while (events && events->used)
208                 events = events->next;
209         return events;
210 }
211
212 /**
213  * Releases the processed 'job': removes it
214  * from the list of jobs and unblock the first
215  * pending job of the same group if any.
216  * @param job the job to release
217  */
218 static inline void job_release(struct job *job)
219 {
220         struct job *ijob, **pjob;
221         const void *group;
222
223         /* first unqueue the job */
224         pjob = &first_job;
225         ijob = first_job;
226         while (ijob != job) {
227                 pjob = &ijob->next;
228                 ijob = ijob->next;
229         }
230         *pjob = job->next;
231
232         /* then unblock jobs of the same group */
233         group = job->group;
234         if (group) {
235                 ijob = job->next;
236                 while (ijob && ijob->group != group)
237                         ijob = ijob->next;
238                 if (ijob)
239                         ijob->blocked = 0;
240         }
241
242         /* recycle the job */
243         job->next = free_jobs;
244         free_jobs = job;
245 }
246
247 /**
248  * Monitored cancel callback for a job.
249  * This function is called by the monitor
250  * to cancel the job when the safe environment
251  * is set.
252  * @param signum 0 on normal flow or the number
253  *               of the signal that interrupted the normal
254  *               flow, isn't used
255  * @param arg    the job to run
256  */
257 static void job_cancel(int signum, void *arg)
258 {
259         struct job *job = arg;
260         job->callback(SIGABRT, job->arg);
261 }
262
263 /**
264  * Monitored normal callback for events.
265  * This function is called by the monitor
266  * to run the event loop when the safe environment
267  * is set.
268  * @param signum 0 on normal flow or the number
269  *               of the signal that interrupted the normal
270  *               flow
271  * @param arg     the events to run
272  */
273 static void events_call(int signum, void *arg)
274 {
275         struct events *events = arg;
276         if (!signum)
277                 sd_event_run(events->event, events->timeout);
278 }
279
280 /**
281  * Main processing loop of threads processing jobs.
282  * The loop must be called with the mutex locked
283  * and it returns with the mutex locked.
284  * @param me the description of the thread to use
285  * TODO: how are timeout handled when reentering?
286  */
287 static void thread_run(volatile struct thread *me)
288 {
289         struct thread **prv, *thr;
290         struct job *job;
291         struct events *events;
292         uint64_t evto;
293
294         /* initialize description of itself and link it in the list */
295         me->tid = pthread_self();
296         me->stop = 0;
297         me->lowered = 0;
298         me->waits = 0;
299         me->upper = current;
300         if (current) {
301                 current->lowered = 1;
302                 evto = EVENT_TIMEOUT_CHILD;
303                 me->events = current->events;
304         } else {
305                 started++;
306                 sig_monitor_init_timeouts();
307                 evto = EVENT_TIMEOUT_TOP;
308                 me->events = NULL;
309         }
310         me->next = threads;
311         threads = (struct thread*)me;
312         current = (struct thread*)me;
313
314         /* loop until stopped */
315         while (!me->stop) {
316                 /* get a job */
317                 job = job_get(first_job);
318                 if (job) {
319                         /* prepare running the job */
320                         remains++; /* increases count of job that can wait */
321                         job->blocked = 1; /* mark job as blocked */
322                         me->job = job; /* record the job (only for terminate) */
323
324                         /* run the job */
325                         pthread_mutex_unlock(&mutex);
326                         sig_monitor(job->timeout, job->callback, job->arg);
327                         pthread_mutex_lock(&mutex);
328
329                         /* release the run job */
330                         job_release(job);
331
332                         /* release event if any */
333                         events = me->events;
334                         if (events) {
335                                 events->used = 0;
336                                 me->events = NULL;
337                         }
338                 } else {
339                         /* no job, check events */
340                         events = me->events;
341                         if (!events || events->runs)
342                                 events = events_get();
343                         if (events) {
344                                 /* run the events */
345                                 events->used = 1;
346                                 events->runs = 1;
347                                 events->timeout = evto;
348                                 me->events = events;
349                                 pthread_mutex_unlock(&mutex);
350                                 sig_monitor(0, events_call, events);
351                                 pthread_mutex_lock(&mutex);
352                                 events->used = 0;
353                                 events->runs = 0;
354                                 me->events = NULL;
355                                 thr = me->upper;
356                                 while (thr && thr->events == events) {
357                                         thr->events = NULL;
358                                         thr = thr->upper;
359                                 }
360                         } else {
361                                 /* no job and not events */
362                                 waiting++;
363                                 me->waits = 1;
364                                 pthread_cond_wait(&cond, &mutex);
365                                 me->waits = 0;
366                                 waiting--;
367                         }
368                 }
369         }
370
371         /* unlink the current thread and cleanup */
372         prv = &threads;
373         while (*prv != me)
374                 prv = &(*prv)->next;
375         *prv = me->next;
376         current = me->upper;
377         if (current) {
378                 current->lowered = 0;
379         } else {
380                 sig_monitor_clean_timeouts();
381                 started--;
382         }
383 }
384
385 /**
386  * Entry point for created threads.
387  * @param data not used
388  * @return NULL
389  */
390 static void *thread_main(void *data)
391 {
392         struct thread me;
393
394         pthread_mutex_lock(&mutex);
395         thread_run(&me);
396         pthread_mutex_unlock(&mutex);
397         return NULL;
398 }
399
400 /**
401  * Starts a new thread
402  * @return 0 in case of success or -1 in case of error
403  */
404 static int start_one_thread()
405 {
406         pthread_t tid;
407         int rc;
408
409         rc = pthread_create(&tid, NULL, thread_main, NULL);
410         if (rc != 0) {
411                 /* errno = rc; */
412                 WARNING("not able to start thread: %m");
413                 rc = -1;
414         }
415         return rc;
416 }
417
418 /**
419  * Queues a new asynchronous job represented by 'callback' and 'arg'
420  * for the 'group' and the 'timeout'.
421  * Jobs are queued FIFO and are possibly executed in parallel
422  * concurrently except for job of the same group that are
423  * executed sequentially in FIFO order.
424  * @param group    The group of the job or NULL when no group.
425  * @param timeout  The maximum execution time in seconds of the job
426  *                 or 0 for unlimited time.
427  * @param callback The function to execute for achieving the job.
428  *                 Its first parameter is either 0 on normal flow
429  *                 or the signal number that broke the normal flow.
430  *                 The remaining parameter is the parameter 'arg1'
431  *                 given here.
432  * @param arg      The second argument for 'callback'
433  * @return 0 in case of success or -1 in case of error
434  */
435 int jobs_queue(
436                 const void *group,
437                 int timeout,
438                 void (*callback)(int, void*),
439                 void *arg)
440 {
441         const char *info;
442         struct job *job;
443         int rc;
444
445         pthread_mutex_lock(&mutex);
446
447         /* allocates the job */
448         job = job_create(group, timeout, callback, arg);
449         if (!job) {
450                 errno = ENOMEM;
451                 info = "out of memory";
452                 goto error;
453         }
454
455         /* check availability */
456         if (remains == 0) {
457                 errno = EBUSY;
458                 info = "too many jobs";
459                 goto error2;
460         }
461
462         /* start a thread if needed */
463         if (waiting == 0 && started < allowed) {
464                 /* all threads are busy and a new can be started */
465                 rc = start_one_thread();
466                 if (rc < 0 && started == 0) {
467                         info = "can't start first thread";
468                         goto error2;
469                 }
470         }
471
472         /* queues the job */
473         remains--;
474         job_add(job);
475
476         /* signal an existing job */
477         pthread_cond_signal(&cond);
478         pthread_mutex_unlock(&mutex);
479         return 0;
480
481 error2:
482         job->next = free_jobs;
483         free_jobs = job;
484 error:
485         ERROR("can't process job with threads: %s, %m", info);
486         pthread_mutex_unlock(&mutex);
487         return -1;
488 }
489
490 /**
491  * Internal helper function for 'jobs_enter'.
492  * @see jobs_enter, jobs_leave
493  */
494 static void enter_cb(int signum, void *closure)
495 {
496         struct sync *sync = closure;
497         sync->enter(signum, sync->arg, (void*)&sync->thread);
498 }
499
500 /**
501  * Internal helper function for 'jobs_call'.
502  * @see jobs_call
503  */
504 static void call_cb(int signum, void *closure)
505 {
506         struct sync *sync = closure;
507         sync->callback(signum, sync->arg);
508         jobs_leave((void*)&sync->thread);
509 }
510
511 /**
512  * Internal helper for synchronous jobs. It enters
513  * a new thread loop for evaluating the given job
514  * as recorded by the couple 'sync_cb' and 'sync'.
515  * @see jobs_call, jobs_enter, jobs_leave
516  */
517 static int do_sync(
518                 const void *group,
519                 int timeout,
520                 void (*sync_cb)(int signum, void *closure),
521                 struct sync *sync
522 )
523 {
524         struct job *job;
525
526         pthread_mutex_lock(&mutex);
527
528         /* allocates the job */
529         job = job_create(group, timeout, sync_cb, sync);
530         if (!job) {
531                 ERROR("out of memory");
532                 errno = ENOMEM;
533                 pthread_mutex_unlock(&mutex);
534                 return -1;
535         }
536
537         /* queues the job */
538         job_add(job);
539
540         /* run until stopped */
541         thread_run(&sync->thread);
542         pthread_mutex_unlock(&mutex);
543         return 0;
544 }
545
546 /**
547  * Enter a synchronisation point: activates the job given by 'callback'
548  * and 'closure' using 'group' and 'timeout' to control sequencing and
549  * execution time.
550  * @param group the group for sequencing jobs
551  * @param timeout the time in seconds allocated to the job
552  * @param callback the callback that will handle the job.
553  *                 it receives 3 parameters: 'signum' that will be 0
554  *                 on normal flow or the catched signal number in case
555  *                 of interrupted flow, the context 'closure' as given and
556  *                 a 'jobloop' reference that must be used when the job is
557  *                 terminated to unlock the current execution flow.
558  * @param arg the argument to the callback
559  * @return 0 on success or -1 in case of error
560  */
561 int jobs_enter(
562                 const void *group,
563                 int timeout,
564                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
565                 void *closure
566 )
567 {
568         struct sync sync;
569
570         sync.enter = callback;
571         sync.arg = closure;
572         return do_sync(group, timeout, enter_cb, &sync);
573 }
574
575 /**
576  * Unlocks the execution flow designed by 'jobloop'.
577  * @param jobloop indication of the flow to unlock
578  * @return 0 in case of success of -1 on error
579  */
580 int jobs_leave(struct jobloop *jobloop)
581 {
582         struct thread *t;
583
584         pthread_mutex_lock(&mutex);
585         t = threads;
586         while (t && t != (struct thread*)jobloop)
587                 t = t->next;
588         if (!t) {
589                 errno = EINVAL;
590         } else {
591                 t->stop = 1;
592                 if (t->waits)
593                         pthread_cond_broadcast(&cond);
594         }
595         pthread_mutex_unlock(&mutex);
596         return -!t;
597 }
598
599 /**
600  * Calls synchronously the job represented by 'callback' and 'arg1'
601  * for the 'group' and the 'timeout' and waits for its completion.
602  * @param group    The group of the job or NULL when no group.
603  * @param timeout  The maximum execution time in seconds of the job
604  *                 or 0 for unlimited time.
605  * @param callback The function to execute for achieving the job.
606  *                 Its first parameter is either 0 on normal flow
607  *                 or the signal number that broke the normal flow.
608  *                 The remaining parameter is the parameter 'arg1'
609  *                 given here.
610  * @param arg      The second argument for 'callback'
611  * @return 0 in case of success or -1 in case of error
612  */
613 int jobs_call(
614                 const void *group,
615                 int timeout,
616                 void (*callback)(int, void*),
617                 void *arg)
618 {
619         struct sync sync;
620
621         sync.callback = callback;
622         sync.arg = arg;
623
624         return do_sync(group, timeout, call_cb, &sync);
625 }
626
627 /**
628  * Gets a sd_event item for the current thread.
629  * @return a sd_event or NULL in case of error
630  */
631 struct sd_event *jobs_get_sd_event()
632 {
633         struct events *events;
634         struct thread *me;
635         int rc;
636
637         pthread_mutex_lock(&mutex);
638
639         /* search events on stack */
640         me = current;
641         while (me && !me->events)
642                 me = me->upper;
643         if (me)
644                 /* return the stacked events */
645                 events = me->events;
646         else {
647                 /* search an available events */
648                 events = events_get();
649                 if (!events) {
650                         /* not found, check if creation possible */
651                         if (nevents >= allowed) {
652                                 ERROR("not possible to add a new event");
653                                 events = NULL;
654                         } else {
655                                 events = malloc(sizeof *events);
656                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
657                                         if (nevents < started || start_one_thread() >= 0) {
658                                                 events->used = 0;
659                                                 events->runs = 0;
660                                                 events->next = first_events;
661                                                 first_events = events;
662                                         } else {
663                                                 ERROR("can't start thread for events");
664                                                 sd_event_unref(events->event);
665                                                 free(events);
666                                                 events = NULL;
667                                         }
668                                 } else {
669                                         if (!events) {
670                                                 ERROR("out of memory");
671                                                 errno = ENOMEM;
672                                         } else {
673                                                 free(events);
674                                                 ERROR("creation of sd_event failed: %m");
675                                                 events = NULL;
676                                                 errno = -rc;
677                                         }
678                                 }
679                         }
680                 }
681                 if (events) {
682                         me = current;
683                         if (me) {
684                                 events->used = 1;
685                                 me->events = events;
686                         } else {
687                                 WARNING("event returned for unknown thread!");
688                         }
689                 }
690         }
691         pthread_mutex_unlock(&mutex);
692         return events ? events->event : NULL;
693 }
694
695 /**
696  * Enter the jobs processing loop.
697  * @param allowed_count Maximum count of thread for jobs including this one
698  * @param start_count   Count of thread to start now, must be lower.
699  * @param waiter_count  Maximum count of jobs that can be waiting.
700  * @param start         The start routine to activate (can't be NULL)
701  * @return 0 in case of success or -1 in case of error.
702  */
703 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
704 {
705         int rc, launched;
706         struct thread me;
707         struct job *job;
708
709         assert(allowed_count >= 1);
710         assert(start_count >= 0);
711         assert(waiter_count > 0);
712         assert(start_count <= allowed_count);
713
714         rc = -1;
715         pthread_mutex_lock(&mutex);
716
717         /* check whether already running */
718         if (current || allowed) {
719                 ERROR("thread already started");
720                 errno = EINVAL;
721                 goto error;
722         }
723
724         /* start */
725         if (sig_monitor_init() < 0) {
726                 ERROR("failed to initialise signal handlers");
727                 goto error;
728         }
729
730         /* records the allowed count */
731         allowed = allowed_count;
732         started = 0;
733         waiting = 0;
734         remains = waiter_count;
735
736         /* start at least one thread */
737         launched = 0;
738         while ((launched + 1) < start_count) {
739                 if (start_one_thread() != 0) {
740                         ERROR("Not all threads can be started");
741                         goto error;
742                 }
743                 launched++;
744         }
745
746         /* queue the start job */
747         job = job_create(NULL, 0, (job_cb_t)start, NULL);
748         if (!job) {
749                 ERROR("out of memory");
750                 errno = ENOMEM;
751                 goto error;
752         }
753         job_add(job);
754         remains--;
755
756         /* run until end */
757         thread_run(&me);
758         rc = 0;
759 error:
760         pthread_mutex_unlock(&mutex);
761         return rc;
762 }
763
764 /**
765  * Terminate all the threads and cancel all pending jobs.
766  */
767 void jobs_terminate()
768 {
769         struct job *job, *head, *tail;
770         pthread_t me, *others;
771         struct thread *t;
772         int count;
773
774         /* how am i? */
775         me = pthread_self();
776
777         /* request all threads to stop */
778         pthread_mutex_lock(&mutex);
779         allowed = 0;
780
781         /* count the number of threads */
782         count = 0;
783         t = threads;
784         while (t) {
785                 if (!t->upper && !pthread_equal(t->tid, me))
786                         count++;
787                 t = t->next;
788         }
789
790         /* fill the array of threads */
791         others = alloca(count * sizeof *others);
792         count = 0;
793         t = threads;
794         while (t) {
795                 if (!t->upper && !pthread_equal(t->tid, me))
796                         others[count++] = t->tid;
797                 t = t->next;
798         }
799
800         /* stops the threads */
801         t = threads;
802         while (t) {
803                 t->stop = 1;
804                 t = t->next;
805         }
806
807         /* wait the threads */
808         pthread_cond_broadcast(&cond);
809         pthread_mutex_unlock(&mutex);
810         while (count)
811                 pthread_join(others[--count], NULL);
812         pthread_mutex_lock(&mutex);
813
814         /* cancel pending jobs of other threads */
815         remains = 0;
816         head = first_job;
817         first_job = NULL;
818         tail = NULL;
819         while (head) {
820                 /* unlink the job */
821                 job = head;
822                 head = job->next;
823
824                 /* search if job is stacked for current */
825                 t = current;
826                 while (t && t->job != job)
827                         t = t->upper;
828                 if (t) {
829                         /* yes, relink it at end */
830                         if (tail)
831                                 tail->next = job;
832                         else
833                                 first_job = job;
834                         tail = job;
835                         job->next = NULL;
836                 } else {
837                         /* no cancel the job */
838                         pthread_mutex_unlock(&mutex);
839                         sig_monitor(0, job_cancel, job);
840                         free(job);
841                         pthread_mutex_lock(&mutex);
842                 }
843         }
844         pthread_mutex_unlock(&mutex);
845 }
846