Avoid to stick on event loop
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*, void *, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         void *group;         /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg1;          /**< first arg */
56         void *arg2;          /**< second arg */
57         void *arg3;          /**< third arg */
58         int timeout;         /**< timeout in second for processing the request */
59         unsigned blocked: 1; /**< is an other request blocking this one ? */
60         unsigned dropped: 1; /**< is removed ? */
61 };
62
63 /** Description of handled event loops */
64 struct events
65 {
66         struct events *next;
67         struct sd_event *event;
68         uint64_t timeout;
69         unsigned runs: 1;
70 };
71
72 /** Description of threads */
73 struct thread
74 {
75         struct thread *next;   /**< next thread of the list */
76         struct thread *upper;  /**< upper same thread */
77         struct job *job;       /**< currently processed job */
78         struct events *events; /**< currently processed job */
79         pthread_t tid;         /**< the thread id */
80         unsigned stop: 1;      /**< stop requested */
81         unsigned lowered: 1;   /**< has a lower same thread */
82         unsigned waits: 1;     /**< is waiting? */
83 };
84
85 /* synchronisation of threads */
86 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
87 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
88
89 /* count allowed, started and waiting threads */
90 static int allowed = 0; /** allowed count of threads */
91 static int started = 0; /** started count of threads */
92 static int waiting = 0; /** waiting count of threads */
93 static int remains = 0; /** allowed count of waiting jobs */
94 static int nevents = 0; /** count of events */
95
96 /* list of threads */
97 static struct thread *threads;
98 static _Thread_local struct thread *current;
99
100 /* queue of pending jobs */
101 static struct job *first_job;
102 static struct events *first_events;
103 static struct job *free_jobs;
104
105 /**
106  * Create a new job with the given parameters
107  * @param group    the group of the job
108  * @param timeout  the timeout of the job (0 if none)
109  * @param callback the function that achieves the job
110  * @param arg1     the first argument of the callback
111  * @param arg2     the second argument of the callback
112  * @param arg3     the third argument of the callback
113  * @return the created job unblock or NULL when no more memory
114  */
115 static struct job *job_create(
116                 void *group,
117                 int timeout,
118                 job_cb_t callback,
119                 void *arg1,
120                 void *arg2,
121                 void *arg3)
122 {
123         struct job *job;
124
125         /* try recyle existing job */
126         job = free_jobs;
127         if (job)
128                 free_jobs = job->next;
129         else {
130                 /* allocation  without blocking */
131                 pthread_mutex_unlock(&mutex);
132                 job = malloc(sizeof *job);
133                 pthread_mutex_lock(&mutex);
134                 if (!job) {
135                         errno = -ENOMEM;
136                         goto end;
137                 }
138         }
139         /* initialises the job */
140         job->group = group;
141         job->timeout = timeout;
142         job->callback = callback;
143         job->arg1 = arg1;
144         job->arg2 = arg2;
145         job->arg3 = arg3;
146         job->blocked = 0;
147         job->dropped = 0;
148 end:
149         return job;
150 }
151
152 /**
153  * Adds 'job' at the end of the list of jobs, marking it
154  * as blocked if an other job with the same group is pending.
155  * @param job the job to add
156  */
157 static void job_add(struct job *job)
158 {
159         void *group;
160         struct job *ijob, **pjob;
161
162         /* prepare to add */
163         group = job->group;
164         job->next = NULL;
165
166         /* search end and blockers */
167         pjob = &first_job;
168         ijob = first_job;
169         while (ijob) {
170                 if (group && ijob->group == group)
171                         job->blocked = 1;
172                 pjob = &ijob->next;
173                 ijob = ijob->next;
174         }
175
176         /* queue the jobs */
177         *pjob = job;
178 }
179
180 /**
181  * Get the next job to process or NULL if none.
182  * @return the first job that isn't blocked or NULL
183  */
184 static inline struct job *job_get()
185 {
186         struct job *job = first_job;
187         while (job && job->blocked)
188                 job = job->next;
189         return job;
190 }
191
192 /**
193  * Get the next events to process or NULL if none.
194  * @return the first events that isn't running or NULL
195  */
196 static inline struct events *events_get()
197 {
198         struct events *events = first_events;
199         while (events && events->runs)
200                 events = events->next;
201         return events;
202 }
203
204 /**
205  * Releases the processed 'job': removes it
206  * from the list of jobs and unblock the first
207  * pending job of the same group if any.
208  * @param job the job to release
209  */
210 static inline void job_release(struct job *job)
211 {
212         struct job *ijob, **pjob;
213         void *group;
214
215         /* first unqueue the job */
216         pjob = &first_job;
217         ijob = first_job;
218         while (ijob != job) {
219                 pjob = &ijob->next;
220                 ijob = ijob->next;
221         }
222         *pjob = job->next;
223
224         /* then unblock jobs of the same group */
225         group = job->group;
226         if (group) {
227                 ijob = job->next;
228                 while (ijob && ijob->group != group)
229                         ijob = ijob->next;
230                 if (ijob)
231                         ijob->blocked = 0;
232         }
233
234         /* recycle the job */
235         job->next = free_jobs;
236         free_jobs = job;
237 }
238
239 /**
240  * Monitored normal callback for a job.
241  * This function is called by the monitor
242  * to run the job when the safe environment
243  * is set.
244  * @param signum 0 on normal flow or the number
245  *               of the signal that interrupted the normal
246  *               flow
247  * @param arg     the job to run
248  */
249 static void job_call(int signum, void *arg)
250 {
251         struct job *job = arg;
252         job->callback(signum, job->arg1, job->arg2, job->arg3);
253 }
254
255 /**
256  * Monitored cancel callback for a job.
257  * This function is called by the monitor
258  * to cancel the job when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow, isn't used
263  * @param arg    the job to run
264  */
265 static void job_cancel(int signum, void *arg)
266 {
267         job_call(SIGABRT, arg);
268 }
269
270 /**
271  * Monitored normal callback for events.
272  * This function is called by the monitor
273  * to run the event loop when the safe environment
274  * is set.
275  * @param signum 0 on normal flow or the number
276  *               of the signal that interrupted the normal
277  *               flow
278  * @param arg     the events to run
279  */
280 static void events_call(int signum, void *arg)
281 {
282         struct events *events = arg;
283         if (!signum)
284                 sd_event_run(events->event, events->timeout);
285 }
286
287 /**
288  * Main processing loop of threads processing jobs.
289  * The loop must be called with the mutex locked
290  * and it returns with the mutex locked.
291  * @param me the description of the thread to use
292  * TODO: how are timeout handled when reentering?
293  */
294 static void thread_run(volatile struct thread *me)
295 {
296         struct thread **prv;
297         struct job *job;
298         struct events *events;
299         uint64_t evto;
300
301         /* initialize description of itself and link it in the list */
302         me->tid = pthread_self();
303         me->stop = 0;
304         me->lowered = 0;
305         me->waits = 0;
306         me->upper = current;
307         if (current) {
308                 current->lowered = 1;
309                 evto = EVENT_TIMEOUT_CHILD;
310         } else {
311                 started++;
312                 sig_monitor_init_timeouts();
313                 evto = EVENT_TIMEOUT_TOP;
314         }
315         me->next = threads;
316         threads = (struct thread*)me;
317         current = (struct thread*)me;
318
319         NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
320
321         /* loop until stopped */
322         me->events = NULL;
323         while (!me->stop) {
324                 /* get a job */
325                 job = job_get(first_job);
326                 if (job) {
327                         /* prepare running the job */
328                         remains++; /* increases count of job that can wait */
329                         job->blocked = 1; /* mark job as blocked */
330                         me->job = job; /* record the job (only for terminate) */
331
332                         /* run the job */
333                         pthread_mutex_unlock(&mutex);
334                         sig_monitor(job->timeout, job_call, job);
335                         pthread_mutex_lock(&mutex);
336
337                         /* release the run job */
338                         job_release(job);
339
340                         /* release event if any */
341                         events = me->events;
342                         if (events) {
343                                 events->runs = 0;
344                                 me->events = NULL;
345                         }
346                 } else {
347                         /* no job, check events */
348                         events = events_get();
349                         if (events) {
350                                 /* run the events */
351                                 events->runs = 1;
352                                 events->timeout = evto;
353                                 me->events = events;
354                                 pthread_mutex_unlock(&mutex);
355                                 sig_monitor(0, events_call, events);
356                                 pthread_mutex_lock(&mutex);
357                                 events->runs = 0;
358                                 me->events = NULL;
359                         } else {
360                                 /* no job and not events */
361                                 waiting++;
362                                 me->waits = 1;
363                                 pthread_cond_wait(&cond, &mutex);
364                                 me->waits = 0;
365                                 waiting--;
366                         }
367                 }
368         }
369         NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
370
371         /* unlink the current thread and cleanup */
372         prv = &threads;
373         while (*prv != me)
374                 prv = &(*prv)->next;
375         *prv = me->next;
376         current = me->upper;
377         if (current) {
378                 current->lowered = 0;
379         } else {
380                 sig_monitor_clean_timeouts();
381                 started--;
382         }
383 }
384
385 /**
386  * Entry point for created threads.
387  * @param data not used
388  * @return NULL
389  */
390 static void *thread_main(void *data)
391 {
392         struct thread me;
393
394         pthread_mutex_lock(&mutex);
395         thread_run(&me);
396         pthread_mutex_unlock(&mutex);
397         return NULL;
398 }
399
400 /**
401  * Starts a new thread
402  * @return 0 in case of success or -1 in case of error
403  */
404 static int start_one_thread()
405 {
406         pthread_t tid;
407         int rc;
408
409         rc = pthread_create(&tid, NULL, thread_main, NULL);
410         if (rc != 0) {
411                 /* errno = rc; */
412                 WARNING("not able to start thread: %m");
413                 rc = -1;
414         }
415         return rc;
416 }
417
418 /**
419  * Queues a new asynchronous job represented by 'callback'
420  * for the 'group' and the 'timeout'.
421  * Jobs are queued FIFO and are possibly executed in parallel
422  * concurrently except for job of the same group that are
423  * executed sequentially in FIFO order.
424  * @param group    The group of the job or NULL when no group.
425  * @param timeout  The maximum execution time in seconds of the job
426  *                 or 0 for unlimited time.
427  * @param callback The function to execute for achieving the job.
428  *                 Its first parameter is either 0 on normal flow
429  *                 or the signal number that broke the normal flow.
430  * @return 0 in case of success or -1 in case of error
431  */
432 int jobs_queue0(
433                 void *group,
434                 int timeout,
435                 void (*callback)(int signum))
436 {
437         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
438 }
439
440 /**
441  * Queues a new asynchronous job represented by 'callback' and 'arg1'
442  * for the 'group' and the 'timeout'.
443  * Jobs are queued FIFO and are possibly executed in parallel
444  * concurrently except for job of the same group that are
445  * executed sequentially in FIFO order.
446  * @param group    The group of the job or NULL when no group.
447  * @param timeout  The maximum execution time in seconds of the job
448  *                 or 0 for unlimited time.
449  * @param callback The function to execute for achieving the job.
450  *                 Its first parameter is either 0 on normal flow
451  *                 or the signal number that broke the normal flow.
452  *                 The remaining parameter is the parameter 'arg1'
453  *                 given here.
454  * @param arg1     The second argument for 'callback'
455  * @return 0 in case of success or -1 in case of error
456  */
457 int jobs_queue(
458                 void *group,
459                 int timeout,
460                 void (*callback)(int, void*),
461                 void *arg)
462 {
463         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
464 }
465
466 /**
467  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
468  * for the 'group' and the 'timeout'.
469  * Jobs are queued FIFO and are possibly executed in parallel
470  * concurrently except for job of the same group that are
471  * executed sequentially in FIFO order.
472  * @param group    The group of the job or NULL when no group.
473  * @param timeout  The maximum execution time in seconds of the job
474  *                 or 0 for unlimited time.
475  * @param callback The function to execute for achieving the job.
476  *                 Its first parameter is either 0 on normal flow
477  *                 or the signal number that broke the normal flow.
478  *                 The remaining parameters are the parameters 'arg[12]'
479  *                 given here.
480  * @param arg1     The second argument for 'callback'
481  * @param arg2     The third argument for 'callback'
482  * @return 0 in case of success or -1 in case of error
483  */
484 int jobs_queue2(
485                 void *group,
486                 int timeout,
487                 void (*callback)(int, void*, void*),
488                 void *arg1,
489                 void *arg2)
490 {
491         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
492 }
493
494 /**
495  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
496  * for the 'group' and the 'timeout'.
497  * Jobs are queued FIFO and are possibly executed in parallel
498  * concurrently except for job of the same group that are
499  * executed sequentially in FIFO order.
500  * @param group    The group of the job or NULL when no group.
501  * @param timeout  The maximum execution time in seconds of the job
502  *                 or 0 for unlimited time.
503  * @param callback The function to execute for achieving the job.
504  *                 Its first parameter is either 0 on normal flow
505  *                 or the signal number that broke the normal flow.
506  *                 The remaining parameters are the parameters 'arg[123]'
507  *                 given here.
508  * @param arg1     The second argument for 'callback'
509  * @param arg2     The third argument for 'callback'
510  * @param arg3     The forth argument for 'callback'
511  * @return 0 in case of success or -1 in case of error
512  */
513 int jobs_queue3(
514                 void *group,
515                 int timeout,
516                 void (*callback)(int, void*, void *, void*),
517                 void *arg1,
518                 void *arg2,
519                 void *arg3)
520 {
521         const char *info;
522         struct job *job;
523         int rc;
524
525         pthread_mutex_lock(&mutex);
526
527         /* allocates the job */
528         job = job_create(group, timeout, callback, arg1, arg2, arg3);
529         if (!job) {
530                 errno = ENOMEM;
531                 info = "out of memory";
532                 goto error;
533         }
534
535         /* check availability */
536         if (remains == 0) {
537                 errno = EBUSY;
538                 info = "too many jobs";
539                 goto error2;
540         }
541
542         /* start a thread if needed */
543         if (waiting == 0 && started < allowed) {
544                 /* all threads are busy and a new can be started */
545                 rc = start_one_thread();
546                 if (rc < 0 && started == 0) {
547                         info = "can't start first thread";
548                         goto error2;
549                 }
550         }
551
552         /* queues the job */
553         remains--;
554         job_add(job);
555
556         /* signal an existing job */
557         pthread_cond_signal(&cond);
558         pthread_mutex_unlock(&mutex);
559         return 0;
560
561 error2:
562         job->next = free_jobs;
563         free_jobs = job;
564 error:
565         ERROR("can't process job with threads: %s, %m", info);
566         pthread_mutex_unlock(&mutex);
567         return -1;
568 }
569
570 /**
571  * Enter a synchronisation point: activates the job given by 'callback'
572  * @param group the gro
573  */
574 int jobs_enter(
575                 void *group,
576                 int timeout,
577                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
578                 void *closure)
579 {
580         
581         struct job *job;
582         struct thread me;
583
584         pthread_mutex_lock(&mutex);
585
586         /* allocates the job */
587         job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
588         if (!job) {
589                 ERROR("out of memory");
590                 errno = ENOMEM;
591                 pthread_mutex_unlock(&mutex);
592                 return -1;
593         }
594
595         /* queues the job */
596         job_add(job);
597
598         /* run until stopped */
599         thread_run(&me);
600         pthread_mutex_unlock(&mutex);
601         return 0;
602 }
603
604 int jobs_leave(struct jobloop *jobloop)
605 {
606         struct thread *t;
607         pthread_mutex_lock(&mutex);
608
609         t = threads;
610         while (t && t != (struct thread*)jobloop)
611                 t = t->next;
612         if (!t) {
613                 errno = EINVAL;
614         } else {
615                 t->stop = 1;
616                 if (t->waits)
617                         pthread_cond_broadcast(&cond);
618         }
619         pthread_mutex_unlock(&mutex);
620         return -!t;
621 }
622
623 /**
624  * Gets a sd_event item for the current thread.
625  * @return a sd_event or NULL in case of error
626  */
627 struct sd_event *jobs_get_sd_event()
628 {
629         struct events *events;
630         struct thread *me;
631         int rc;
632
633         pthread_mutex_lock(&mutex);
634
635         /* search events on stack */
636         me = current;
637         while (me && !me->events)
638                 me = me->upper;
639         if (me)
640                 /* return the stacked events */
641                 events = me->events;
642         else {
643                 /* search an available events */
644                 events = events_get();
645                 if (!events) {
646                         /* not found, check if creation possible */
647                         if (nevents >= allowed) {
648                                 ERROR("not possible to add a new event");
649                                 events = NULL;
650                         } else {
651                                 events = malloc(sizeof *events);
652                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
653                                         if (nevents < started || start_one_thread() >= 0) {
654                                                 events->runs = 0;
655                                                 events->next = first_events;
656                                                 first_events = events;
657                                         } else {
658                                                 ERROR("can't start thread for events");
659                                                 sd_event_unref(events->event);
660                                                 free(events);
661                                                 events = NULL;
662                                         }
663                                 } else {
664                                         if (!events) {
665                                                 ERROR("out of memory");
666                                                 errno = ENOMEM;
667                                         } else {
668                                                 free(events);
669                                                 ERROR("creation of sd_event failed: %m");
670                                                 events = NULL;
671                                                 errno = -rc;
672                                         } 
673                                 }
674                         }
675                 }
676                 if (events) {
677                         /* */
678                         me = current;
679                         if (me) {
680                                 events->runs = 1;
681                                 me->events = events;
682                         } else {
683                                 WARNING("event returned for unknown thread!");
684                         }
685                 }
686         }
687         pthread_mutex_unlock(&mutex);
688         return events ? events->event : NULL;
689 }
690
691 /**
692  * Enter the jobs processing loop.
693  * @param allowed_count Maximum count of thread for jobs including this one
694  * @param start_count   Count of thread to start now, must be lower.
695  * @param waiter_count  Maximum count of jobs that can be waiting.
696  * @param start         The start routine to activate (can't be NULL)
697  * @return 0 in case of success or -1 in case of error.
698  */
699 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
700 {
701         int rc, launched;
702         struct thread me;
703         struct job *job;
704
705         assert(allowed_count >= 1);
706         assert(start_count >= 0);
707         assert(waiter_count > 0);
708         assert(start_count <= allowed_count);
709
710         rc = -1;
711         pthread_mutex_lock(&mutex);
712
713         /* check whether already running */
714         if (current || allowed) {
715                 ERROR("thread already started");
716                 errno = EINVAL;
717                 goto error;
718         }
719
720         /* start */
721         if (sig_monitor_init() < 0) {
722                 ERROR("failed to initialise signal handlers");
723                 goto error;
724         }
725
726         /* records the allowed count */
727         allowed = allowed_count;
728         started = 0;
729         waiting = 0;
730         remains = waiter_count;
731
732         /* start at least one thread */
733         launched = 0;
734         while ((launched + 1) < start_count) {
735                 if (start_one_thread() != 0) {
736                         ERROR("Not all threads can be started");
737                         goto error;
738                 }
739                 launched++;
740         }
741
742         /* queue the start job */
743         job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
744         if (!job) {
745                 ERROR("out of memory");
746                 errno = ENOMEM;
747                 goto error;
748         }
749         job_add(job);
750         remains--;
751
752         /* run until end */
753         thread_run(&me);
754         rc = 0;
755 error:
756         pthread_mutex_unlock(&mutex);
757         return rc;
758 }
759
760 /**
761  * Terminate all the threads and cancel all pending jobs.
762  */
763 void jobs_terminate()
764 {
765         struct job *job, *head, *tail;
766         pthread_t me, *others;
767         struct thread *t;
768         int count;
769
770         /* how am i? */
771         me = pthread_self();
772
773         /* request all threads to stop */
774         pthread_mutex_lock(&mutex);
775         allowed = 0;
776
777         /* count the number of threads */
778         count = 0;
779         t = threads;
780         while (t) {
781                 if (!t->upper && !pthread_equal(t->tid, me))
782                         count++;
783                 t = t->next;
784         }
785
786         /* fill the array of threads */
787         others = alloca(count * sizeof *others);
788         count = 0;
789         t = threads;
790         while (t) {
791                 if (!t->upper && !pthread_equal(t->tid, me))
792                         others[count++] = t->tid;
793                 t = t->next;
794         }
795
796         /* stops the threads */
797         t = threads;
798         while (t) {
799                 t->stop = 1;
800                 t = t->next;
801         }
802
803         /* wait the threads */
804         pthread_cond_broadcast(&cond);
805         pthread_mutex_unlock(&mutex);
806         while (count)
807                 pthread_join(others[--count], NULL);
808         pthread_mutex_lock(&mutex);
809
810         /* cancel pending jobs of other threads */
811         remains = 0;
812         head = first_job;
813         first_job = NULL;
814         tail = NULL;
815         while (head) {
816                 /* unlink the job */
817                 job = head;
818                 head = job->next;
819
820                 /* search if job is stacked for current */
821                 t = current;
822                 while (t && t->job != job)
823                         t = t->upper;
824                 if (t) {
825                         /* yes, relink it at end */
826                         if (tail)
827                                 tail->next = job;
828                         else
829                                 first_job = job;
830                         tail = job;
831                         job->next = NULL;
832                 } else {
833                         /* no cancel the job */
834                         pthread_mutex_unlock(&mutex);
835                         sig_monitor(0, job_cancel, job);
836                         free(job);
837                         pthread_mutex_lock(&mutex);
838                 }
839         }
840         pthread_mutex_unlock(&mutex);
841 }
842