jobs: fix starve
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         void *group;         /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg;           /**< argument */
56         int timeout;         /**< timeout in second for processing the request */
57         unsigned blocked: 1; /**< is an other request blocking this one ? */
58         unsigned dropped: 1; /**< is removed ? */
59 };
60
61 /** Description of handled event loops */
62 struct events
63 {
64         struct events *next;
65         struct sd_event *event;
66         uint64_t timeout;
67         unsigned used: 1;
68         unsigned runs: 1;
69 };
70
71 /** Description of threads */
72 struct thread
73 {
74         struct thread *next;   /**< next thread of the list */
75         struct thread *upper;  /**< upper same thread */
76         struct job *job;       /**< currently processed job */
77         struct events *events; /**< currently processed job */
78         pthread_t tid;         /**< the thread id */
79         unsigned stop: 1;      /**< stop requested */
80         unsigned lowered: 1;   /**< has a lower same thread */
81         unsigned waits: 1;     /**< is waiting? */
82 };
83
84 /**
85  * Description of synchonous callback
86  */
87 struct sync
88 {
89         struct thread thread;   /**< thread loop data */
90         union {
91                 void (*callback)(int, void*);   /**< the synchronous callback */
92                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
93                                 /**< the entering synchronous routine */
94         };
95         void *arg;              /**< the argument of the callback */
96 };
97
98
99 /* synchronisation of threads */
100 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
102
103 /* count allowed, started and waiting threads */
104 static int allowed = 0; /** allowed count of threads */
105 static int started = 0; /** started count of threads */
106 static int waiting = 0; /** waiting count of threads */
107 static int remains = 0; /** allowed count of waiting jobs */
108 static int nevents = 0; /** count of events */
109
110 /* list of threads */
111 static struct thread *threads;
112 static _Thread_local struct thread *current;
113
114 /* queue of pending jobs */
115 static struct job *first_job;
116 static struct events *first_events;
117 static struct job *free_jobs;
118
119 /**
120  * Create a new job with the given parameters
121  * @param group    the group of the job
122  * @param timeout  the timeout of the job (0 if none)
123  * @param callback the function that achieves the job
124  * @param arg      the argument of the callback
125  * @return the created job unblock or NULL when no more memory
126  */
127 static struct job *job_create(
128                 void *group,
129                 int timeout,
130                 job_cb_t callback,
131                 void *arg)
132 {
133         struct job *job;
134
135         /* try recyle existing job */
136         job = free_jobs;
137         if (job)
138                 free_jobs = job->next;
139         else {
140                 /* allocation  without blocking */
141                 pthread_mutex_unlock(&mutex);
142                 job = malloc(sizeof *job);
143                 pthread_mutex_lock(&mutex);
144                 if (!job) {
145                         errno = -ENOMEM;
146                         goto end;
147                 }
148         }
149         /* initialises the job */
150         job->group = group;
151         job->timeout = timeout;
152         job->callback = callback;
153         job->arg = arg;
154         job->blocked = 0;
155         job->dropped = 0;
156 end:
157         return job;
158 }
159
160 /**
161  * Adds 'job' at the end of the list of jobs, marking it
162  * as blocked if an other job with the same group is pending.
163  * @param job the job to add
164  */
165 static void job_add(struct job *job)
166 {
167         void *group;
168         struct job *ijob, **pjob;
169
170         /* prepare to add */
171         group = job->group;
172         job->next = NULL;
173
174         /* search end and blockers */
175         pjob = &first_job;
176         ijob = first_job;
177         while (ijob) {
178                 if (group && ijob->group == group)
179                         job->blocked = 1;
180                 pjob = &ijob->next;
181                 ijob = ijob->next;
182         }
183
184         /* queue the jobs */
185         *pjob = job;
186 }
187
188 /**
189  * Get the next job to process or NULL if none.
190  * @return the first job that isn't blocked or NULL
191  */
192 static inline struct job *job_get()
193 {
194         struct job *job = first_job;
195         while (job && job->blocked)
196                 job = job->next;
197         return job;
198 }
199
200 /**
201  * Get the next events to process or NULL if none.
202  * @return the first events that isn't running or NULL
203  */
204 static inline struct events *events_get()
205 {
206         struct events *events = first_events;
207         while (events && events->used)
208                 events = events->next;
209         return events;
210 }
211
212 /**
213  * Releases the processed 'job': removes it
214  * from the list of jobs and unblock the first
215  * pending job of the same group if any.
216  * @param job the job to release
217  */
218 static inline void job_release(struct job *job)
219 {
220         struct job *ijob, **pjob;
221         void *group;
222
223         /* first unqueue the job */
224         pjob = &first_job;
225         ijob = first_job;
226         while (ijob != job) {
227                 pjob = &ijob->next;
228                 ijob = ijob->next;
229         }
230         *pjob = job->next;
231
232         /* then unblock jobs of the same group */
233         group = job->group;
234         if (group) {
235                 ijob = job->next;
236                 while (ijob && ijob->group != group)
237                         ijob = ijob->next;
238                 if (ijob)
239                         ijob->blocked = 0;
240         }
241
242         /* recycle the job */
243         job->next = free_jobs;
244         free_jobs = job;
245 }
246
247 /**
248  * Monitored cancel callback for a job.
249  * This function is called by the monitor
250  * to cancel the job when the safe environment
251  * is set.
252  * @param signum 0 on normal flow or the number
253  *               of the signal that interrupted the normal
254  *               flow, isn't used
255  * @param arg    the job to run
256  */
257 static void job_cancel(int signum, void *arg)
258 {
259         struct job *job = arg;
260         job->callback(SIGABRT, job->arg);
261 }
262
263 /**
264  * Monitored normal callback for events.
265  * This function is called by the monitor
266  * to run the event loop when the safe environment
267  * is set.
268  * @param signum 0 on normal flow or the number
269  *               of the signal that interrupted the normal
270  *               flow
271  * @param arg     the events to run
272  */
273 static void events_call(int signum, void *arg)
274 {
275         struct events *events = arg;
276         if (!signum)
277                 sd_event_run(events->event, events->timeout);
278 }
279
280 /**
281  * Main processing loop of threads processing jobs.
282  * The loop must be called with the mutex locked
283  * and it returns with the mutex locked.
284  * @param me the description of the thread to use
285  * TODO: how are timeout handled when reentering?
286  */
287 static void thread_run(volatile struct thread *me)
288 {
289         struct thread **prv, *thr;
290         struct job *job;
291         struct events *events;
292         uint64_t evto;
293
294         /* initialize description of itself and link it in the list */
295         me->tid = pthread_self();
296         me->stop = 0;
297         me->lowered = 0;
298         me->waits = 0;
299         me->upper = current;
300         if (current) {
301                 current->lowered = 1;
302                 evto = EVENT_TIMEOUT_CHILD;
303         } else {
304                 started++;
305                 sig_monitor_init_timeouts();
306                 evto = EVENT_TIMEOUT_TOP;
307         }
308         me->next = threads;
309         threads = (struct thread*)me;
310         current = (struct thread*)me;
311
312         /* loop until stopped */
313         me->events = NULL;
314         while (!me->stop) {
315                 /* get a job */
316                 job = job_get(first_job);
317                 if (job) {
318                         /* prepare running the job */
319                         remains++; /* increases count of job that can wait */
320                         job->blocked = 1; /* mark job as blocked */
321                         me->job = job; /* record the job (only for terminate) */
322
323                         /* run the job */
324                         pthread_mutex_unlock(&mutex);
325                         sig_monitor(job->timeout, job->callback, job->arg);
326                         pthread_mutex_lock(&mutex);
327
328                         /* release the run job */
329                         job_release(job);
330
331                         /* release event if any */
332                         events = me->events;
333                         if (events) {
334                                 events->used = 0;
335                                 me->events = NULL;
336                         }
337                 } else {
338                         /* no job, check events */
339                         thr = (struct thread*)me;
340                         events = NULL;
341                         while (thr && !(events = thr->events))
342                                 thr = thr->upper;
343                         if (events && !events->runs) {
344                                 /* run the events */
345                                 events->runs = 1;
346                                 events->timeout = evto;
347                                 me->events = events;
348                                 pthread_mutex_unlock(&mutex);
349                                 sig_monitor(0, events_call, events);
350                                 pthread_mutex_lock(&mutex);
351                                 events->runs = 0;
352                                 me->events = NULL;
353                         } else {
354                                 /* no owned event, check events */
355                                 events = events_get();
356                                 if (events) {
357                                         /* run the events */
358                                         events->used = 1;
359                                         events->runs = 1;
360                                         events->timeout = evto;
361                                         me->events = events;
362                                         pthread_mutex_unlock(&mutex);
363                                         sig_monitor(0, events_call, events);
364                                         pthread_mutex_lock(&mutex);
365                                         events->used = 0;
366                                         events->runs = 0;
367                                         me->events = NULL;
368                                 } else {
369                                         /* no job and not events */
370                                         waiting++;
371                                         me->waits = 1;
372                                         pthread_cond_wait(&cond, &mutex);
373                                         me->waits = 0;
374                                         waiting--;
375                                 }
376                         }
377                 }
378         }
379
380         /* unlink the current thread and cleanup */
381         prv = &threads;
382         while (*prv != me)
383                 prv = &(*prv)->next;
384         *prv = me->next;
385         current = me->upper;
386         if (current) {
387                 current->lowered = 0;
388         } else {
389                 sig_monitor_clean_timeouts();
390                 started--;
391         }
392 }
393
394 /**
395  * Entry point for created threads.
396  * @param data not used
397  * @return NULL
398  */
399 static void *thread_main(void *data)
400 {
401         struct thread me;
402
403         pthread_mutex_lock(&mutex);
404         thread_run(&me);
405         pthread_mutex_unlock(&mutex);
406         return NULL;
407 }
408
409 /**
410  * Starts a new thread
411  * @return 0 in case of success or -1 in case of error
412  */
413 static int start_one_thread()
414 {
415         pthread_t tid;
416         int rc;
417
418         rc = pthread_create(&tid, NULL, thread_main, NULL);
419         if (rc != 0) {
420                 /* errno = rc; */
421                 WARNING("not able to start thread: %m");
422                 rc = -1;
423         }
424         return rc;
425 }
426
427 /**
428  * Queues a new asynchronous job represented by 'callback' and 'arg'
429  * for the 'group' and the 'timeout'.
430  * Jobs are queued FIFO and are possibly executed in parallel
431  * concurrently except for job of the same group that are
432  * executed sequentially in FIFO order.
433  * @param group    The group of the job or NULL when no group.
434  * @param timeout  The maximum execution time in seconds of the job
435  *                 or 0 for unlimited time.
436  * @param callback The function to execute for achieving the job.
437  *                 Its first parameter is either 0 on normal flow
438  *                 or the signal number that broke the normal flow.
439  *                 The remaining parameter is the parameter 'arg1'
440  *                 given here.
441  * @param arg      The second argument for 'callback'
442  * @return 0 in case of success or -1 in case of error
443  */
444 int jobs_queue(
445                 void *group,
446                 int timeout,
447                 void (*callback)(int, void*),
448                 void *arg)
449 {
450         const char *info;
451         struct job *job;
452         int rc;
453
454         pthread_mutex_lock(&mutex);
455
456         /* allocates the job */
457         job = job_create(group, timeout, callback, arg);
458         if (!job) {
459                 errno = ENOMEM;
460                 info = "out of memory";
461                 goto error;
462         }
463
464         /* check availability */
465         if (remains == 0) {
466                 errno = EBUSY;
467                 info = "too many jobs";
468                 goto error2;
469         }
470
471         /* start a thread if needed */
472         if (waiting == 0 && started < allowed) {
473                 /* all threads are busy and a new can be started */
474                 rc = start_one_thread();
475                 if (rc < 0 && started == 0) {
476                         info = "can't start first thread";
477                         goto error2;
478                 }
479         }
480
481         /* queues the job */
482         remains--;
483         job_add(job);
484
485         /* signal an existing job */
486         pthread_cond_signal(&cond);
487         pthread_mutex_unlock(&mutex);
488         return 0;
489
490 error2:
491         job->next = free_jobs;
492         free_jobs = job;
493 error:
494         ERROR("can't process job with threads: %s, %m", info);
495         pthread_mutex_unlock(&mutex);
496         return -1;
497 }
498
499 /**
500  * Internal helper function for 'jobs_enter'.
501  * @see jobs_enter, jobs_leave
502  */
503 static void enter_cb(int signum, void *closure)
504 {
505         struct sync *sync = closure;
506         sync->enter(signum, sync->arg, (void*)&sync->thread);
507 }
508
509 /**
510  * Internal helper function for 'jobs_call'.
511  * @see jobs_call
512  */
513 static void call_cb(int signum, void *closure)
514 {
515         struct sync *sync = closure;
516         sync->callback(signum, sync->arg);
517         jobs_leave((void*)&sync->thread);
518 }
519
520 /**
521  * Internal helper for synchronous jobs. It enters
522  * a new thread loop for evaluating the given job
523  * as recorded by the couple 'sync_cb' and 'sync'.
524  * @see jobs_call, jobs_enter, jobs_leave
525  */
526 static int do_sync(
527                 void *group,
528                 int timeout,
529                 void (*sync_cb)(int signum, void *closure),
530                 struct sync *sync
531 )
532 {
533         struct job *job;
534
535         pthread_mutex_lock(&mutex);
536
537         /* allocates the job */
538         job = job_create(group, timeout, sync_cb, sync);
539         if (!job) {
540                 ERROR("out of memory");
541                 errno = ENOMEM;
542                 pthread_mutex_unlock(&mutex);
543                 return -1;
544         }
545
546         /* queues the job */
547         job_add(job);
548
549         /* run until stopped */
550         thread_run(&sync->thread);
551         pthread_mutex_unlock(&mutex);
552         return 0;
553 }
554
555 /**
556  * Enter a synchronisation point: activates the job given by 'callback'
557  * and 'closure' using 'group' and 'timeout' to control sequencing and
558  * execution time.
559  * @param group the group for sequencing jobs
560  * @param timeout the time in seconds allocated to the job
561  * @param callback the callback that will handle the job.
562  *                 it receives 3 parameters: 'signum' that will be 0
563  *                 on normal flow or the catched signal number in case
564  *                 of interrupted flow, the context 'closure' as given and
565  *                 a 'jobloop' reference that must be used when the job is
566  *                 terminated to unlock the current execution flow.
567  * @param arg the argument to the callback
568  * @return 0 on success or -1 in case of error
569  */
570 int jobs_enter(
571                 void *group,
572                 int timeout,
573                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
574                 void *closure
575 )
576 {
577         struct sync sync;
578
579         sync.enter = callback;
580         sync.arg = closure;
581         return do_sync(group, timeout, enter_cb, &sync);
582 }
583
584 /**
585  * Unlocks the execution flow designed by 'jobloop'.
586  * @param jobloop indication of the flow to unlock
587  * @return 0 in case of success of -1 on error
588  */
589 int jobs_leave(struct jobloop *jobloop)
590 {
591         struct thread *t;
592
593         pthread_mutex_lock(&mutex);
594         t = threads;
595         while (t && t != (struct thread*)jobloop)
596                 t = t->next;
597         if (!t) {
598                 errno = EINVAL;
599         } else {
600                 t->stop = 1;
601                 if (t->waits)
602                         pthread_cond_broadcast(&cond);
603         }
604         pthread_mutex_unlock(&mutex);
605         return -!t;
606 }
607
608 /**
609  * Calls synchronously the job represented by 'callback' and 'arg1'
610  * for the 'group' and the 'timeout' and waits for its completion.
611  * @param group    The group of the job or NULL when no group.
612  * @param timeout  The maximum execution time in seconds of the job
613  *                 or 0 for unlimited time.
614  * @param callback The function to execute for achieving the job.
615  *                 Its first parameter is either 0 on normal flow
616  *                 or the signal number that broke the normal flow.
617  *                 The remaining parameter is the parameter 'arg1'
618  *                 given here.
619  * @param arg      The second argument for 'callback'
620  * @return 0 in case of success or -1 in case of error
621  */
622 int jobs_call(
623                 void *group,
624                 int timeout,
625                 void (*callback)(int, void*),
626                 void *arg)
627 {
628         struct sync sync;
629
630         sync.callback = callback;
631         sync.arg = arg;
632
633         return do_sync(group, timeout, call_cb, &sync);
634 }
635
636 /**
637  * Gets a sd_event item for the current thread.
638  * @return a sd_event or NULL in case of error
639  */
640 struct sd_event *jobs_get_sd_event()
641 {
642         struct events *events;
643         struct thread *me;
644         int rc;
645
646         pthread_mutex_lock(&mutex);
647
648         /* search events on stack */
649         me = current;
650         while (me && !me->events)
651                 me = me->upper;
652         if (me)
653                 /* return the stacked events */
654                 events = me->events;
655         else {
656                 /* search an available events */
657                 events = events_get();
658                 if (!events) {
659                         /* not found, check if creation possible */
660                         if (nevents >= allowed) {
661                                 ERROR("not possible to add a new event");
662                                 events = NULL;
663                         } else {
664                                 events = malloc(sizeof *events);
665                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
666                                         if (nevents < started || start_one_thread() >= 0) {
667                                                 events->used = 0;
668                                                 events->runs = 0;
669                                                 events->next = first_events;
670                                                 first_events = events;
671                                         } else {
672                                                 ERROR("can't start thread for events");
673                                                 sd_event_unref(events->event);
674                                                 free(events);
675                                                 events = NULL;
676                                         }
677                                 } else {
678                                         if (!events) {
679                                                 ERROR("out of memory");
680                                                 errno = ENOMEM;
681                                         } else {
682                                                 free(events);
683                                                 ERROR("creation of sd_event failed: %m");
684                                                 events = NULL;
685                                                 errno = -rc;
686                                         } 
687                                 }
688                         }
689                 }
690                 if (events) {
691                         me = current;
692                         if (me) {
693                                 events->used = 1;
694                                 me->events = events;
695                         } else {
696                                 WARNING("event returned for unknown thread!");
697                         }
698                 }
699         }
700         pthread_mutex_unlock(&mutex);
701         return events ? events->event : NULL;
702 }
703
704 /**
705  * Enter the jobs processing loop.
706  * @param allowed_count Maximum count of thread for jobs including this one
707  * @param start_count   Count of thread to start now, must be lower.
708  * @param waiter_count  Maximum count of jobs that can be waiting.
709  * @param start         The start routine to activate (can't be NULL)
710  * @return 0 in case of success or -1 in case of error.
711  */
712 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
713 {
714         int rc, launched;
715         struct thread me;
716         struct job *job;
717
718         assert(allowed_count >= 1);
719         assert(start_count >= 0);
720         assert(waiter_count > 0);
721         assert(start_count <= allowed_count);
722
723         rc = -1;
724         pthread_mutex_lock(&mutex);
725
726         /* check whether already running */
727         if (current || allowed) {
728                 ERROR("thread already started");
729                 errno = EINVAL;
730                 goto error;
731         }
732
733         /* start */
734         if (sig_monitor_init() < 0) {
735                 ERROR("failed to initialise signal handlers");
736                 goto error;
737         }
738
739         /* records the allowed count */
740         allowed = allowed_count;
741         started = 0;
742         waiting = 0;
743         remains = waiter_count;
744
745         /* start at least one thread */
746         launched = 0;
747         while ((launched + 1) < start_count) {
748                 if (start_one_thread() != 0) {
749                         ERROR("Not all threads can be started");
750                         goto error;
751                 }
752                 launched++;
753         }
754
755         /* queue the start job */
756         job = job_create(NULL, 0, (job_cb_t)start, NULL);
757         if (!job) {
758                 ERROR("out of memory");
759                 errno = ENOMEM;
760                 goto error;
761         }
762         job_add(job);
763         remains--;
764
765         /* run until end */
766         thread_run(&me);
767         rc = 0;
768 error:
769         pthread_mutex_unlock(&mutex);
770         return rc;
771 }
772
773 /**
774  * Terminate all the threads and cancel all pending jobs.
775  */
776 void jobs_terminate()
777 {
778         struct job *job, *head, *tail;
779         pthread_t me, *others;
780         struct thread *t;
781         int count;
782
783         /* how am i? */
784         me = pthread_self();
785
786         /* request all threads to stop */
787         pthread_mutex_lock(&mutex);
788         allowed = 0;
789
790         /* count the number of threads */
791         count = 0;
792         t = threads;
793         while (t) {
794                 if (!t->upper && !pthread_equal(t->tid, me))
795                         count++;
796                 t = t->next;
797         }
798
799         /* fill the array of threads */
800         others = alloca(count * sizeof *others);
801         count = 0;
802         t = threads;
803         while (t) {
804                 if (!t->upper && !pthread_equal(t->tid, me))
805                         others[count++] = t->tid;
806                 t = t->next;
807         }
808
809         /* stops the threads */
810         t = threads;
811         while (t) {
812                 t->stop = 1;
813                 t = t->next;
814         }
815
816         /* wait the threads */
817         pthread_cond_broadcast(&cond);
818         pthread_mutex_unlock(&mutex);
819         while (count)
820                 pthread_join(others[--count], NULL);
821         pthread_mutex_lock(&mutex);
822
823         /* cancel pending jobs of other threads */
824         remains = 0;
825         head = first_job;
826         first_job = NULL;
827         tail = NULL;
828         while (head) {
829                 /* unlink the job */
830                 job = head;
831                 head = job->next;
832
833                 /* search if job is stacked for current */
834                 t = current;
835                 while (t && t->job != job)
836                         t = t->upper;
837                 if (t) {
838                         /* yes, relink it at end */
839                         if (tail)
840                                 tail->next = job;
841                         else
842                                 first_job = job;
843                         tail = job;
844                         job->next = NULL;
845                 } else {
846                         /* no cancel the job */
847                         pthread_mutex_unlock(&mutex);
848                         sig_monitor(0, job_cancel, job);
849                         free(job);
850                         pthread_mutex_lock(&mutex);
851                 }
852         }
853         pthread_mutex_unlock(&mutex);
854 }
855