8725d00e6cdd4246e5014ab7b6124ad6bb74ca12
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         void *group;         /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg;           /**< argument */
56         int timeout;         /**< timeout in second for processing the request */
57         unsigned blocked: 1; /**< is an other request blocking this one ? */
58         unsigned dropped: 1; /**< is removed ? */
59 };
60
61 /** Description of handled event loops */
62 struct events
63 {
64         struct events *next;
65         struct sd_event *event;
66         uint64_t timeout;
67         unsigned runs: 1;
68 };
69
70 /** Description of threads */
71 struct thread
72 {
73         struct thread *next;   /**< next thread of the list */
74         struct thread *upper;  /**< upper same thread */
75         struct job *job;       /**< currently processed job */
76         struct events *events; /**< currently processed job */
77         pthread_t tid;         /**< the thread id */
78         unsigned stop: 1;      /**< stop requested */
79         unsigned lowered: 1;   /**< has a lower same thread */
80         unsigned waits: 1;     /**< is waiting? */
81 };
82
83 /**
84  * Description of synchonous callback
85  */
86 struct sync
87 {
88         struct thread thread;   /**< thread loop data */
89         union {
90                 void (*callback)(int, void*);   /**< the synchronous callback */
91                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
92                                 /**< the entering synchronous routine */
93         };
94         void *arg;              /**< the argument of the callback */
95 };
96
97
98 /* synchronisation of threads */
99 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
100 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
101
102 /* count allowed, started and waiting threads */
103 static int allowed = 0; /** allowed count of threads */
104 static int started = 0; /** started count of threads */
105 static int waiting = 0; /** waiting count of threads */
106 static int remains = 0; /** allowed count of waiting jobs */
107 static int nevents = 0; /** count of events */
108
109 /* list of threads */
110 static struct thread *threads;
111 static _Thread_local struct thread *current;
112
113 /* queue of pending jobs */
114 static struct job *first_job;
115 static struct events *first_events;
116 static struct job *free_jobs;
117
118 /**
119  * Create a new job with the given parameters
120  * @param group    the group of the job
121  * @param timeout  the timeout of the job (0 if none)
122  * @param callback the function that achieves the job
123  * @param arg      the argument of the callback
124  * @return the created job unblock or NULL when no more memory
125  */
126 static struct job *job_create(
127                 void *group,
128                 int timeout,
129                 job_cb_t callback,
130                 void *arg)
131 {
132         struct job *job;
133
134         /* try recyle existing job */
135         job = free_jobs;
136         if (job)
137                 free_jobs = job->next;
138         else {
139                 /* allocation  without blocking */
140                 pthread_mutex_unlock(&mutex);
141                 job = malloc(sizeof *job);
142                 pthread_mutex_lock(&mutex);
143                 if (!job) {
144                         errno = -ENOMEM;
145                         goto end;
146                 }
147         }
148         /* initialises the job */
149         job->group = group;
150         job->timeout = timeout;
151         job->callback = callback;
152         job->arg = arg;
153         job->blocked = 0;
154         job->dropped = 0;
155 end:
156         return job;
157 }
158
159 /**
160  * Adds 'job' at the end of the list of jobs, marking it
161  * as blocked if an other job with the same group is pending.
162  * @param job the job to add
163  */
164 static void job_add(struct job *job)
165 {
166         void *group;
167         struct job *ijob, **pjob;
168
169         /* prepare to add */
170         group = job->group;
171         job->next = NULL;
172
173         /* search end and blockers */
174         pjob = &first_job;
175         ijob = first_job;
176         while (ijob) {
177                 if (group && ijob->group == group)
178                         job->blocked = 1;
179                 pjob = &ijob->next;
180                 ijob = ijob->next;
181         }
182
183         /* queue the jobs */
184         *pjob = job;
185 }
186
187 /**
188  * Get the next job to process or NULL if none.
189  * @return the first job that isn't blocked or NULL
190  */
191 static inline struct job *job_get()
192 {
193         struct job *job = first_job;
194         while (job && job->blocked)
195                 job = job->next;
196         return job;
197 }
198
199 /**
200  * Get the next events to process or NULL if none.
201  * @return the first events that isn't running or NULL
202  */
203 static inline struct events *events_get()
204 {
205         struct events *events = first_events;
206         while (events && events->runs)
207                 events = events->next;
208         return events;
209 }
210
211 /**
212  * Releases the processed 'job': removes it
213  * from the list of jobs and unblock the first
214  * pending job of the same group if any.
215  * @param job the job to release
216  */
217 static inline void job_release(struct job *job)
218 {
219         struct job *ijob, **pjob;
220         void *group;
221
222         /* first unqueue the job */
223         pjob = &first_job;
224         ijob = first_job;
225         while (ijob != job) {
226                 pjob = &ijob->next;
227                 ijob = ijob->next;
228         }
229         *pjob = job->next;
230
231         /* then unblock jobs of the same group */
232         group = job->group;
233         if (group) {
234                 ijob = job->next;
235                 while (ijob && ijob->group != group)
236                         ijob = ijob->next;
237                 if (ijob)
238                         ijob->blocked = 0;
239         }
240
241         /* recycle the job */
242         job->next = free_jobs;
243         free_jobs = job;
244 }
245
246 /**
247  * Monitored cancel callback for a job.
248  * This function is called by the monitor
249  * to cancel the job when the safe environment
250  * is set.
251  * @param signum 0 on normal flow or the number
252  *               of the signal that interrupted the normal
253  *               flow, isn't used
254  * @param arg    the job to run
255  */
256 static void job_cancel(int signum, void *arg)
257 {
258         struct job *job = arg;
259         job->callback(SIGABRT, job->arg);
260 }
261
262 /**
263  * Monitored normal callback for events.
264  * This function is called by the monitor
265  * to run the event loop when the safe environment
266  * is set.
267  * @param signum 0 on normal flow or the number
268  *               of the signal that interrupted the normal
269  *               flow
270  * @param arg     the events to run
271  */
272 static void events_call(int signum, void *arg)
273 {
274         struct events *events = arg;
275         if (!signum)
276                 sd_event_run(events->event, events->timeout);
277 }
278
279 /**
280  * Main processing loop of threads processing jobs.
281  * The loop must be called with the mutex locked
282  * and it returns with the mutex locked.
283  * @param me the description of the thread to use
284  * TODO: how are timeout handled when reentering?
285  */
286 static void thread_run(volatile struct thread *me)
287 {
288         struct thread **prv;
289         struct job *job;
290         struct events *events;
291         uint64_t evto;
292
293         /* initialize description of itself and link it in the list */
294         me->tid = pthread_self();
295         me->stop = 0;
296         me->lowered = 0;
297         me->waits = 0;
298         me->upper = current;
299         if (current) {
300                 current->lowered = 1;
301                 evto = EVENT_TIMEOUT_CHILD;
302         } else {
303                 started++;
304                 sig_monitor_init_timeouts();
305                 evto = EVENT_TIMEOUT_TOP;
306         }
307         me->next = threads;
308         threads = (struct thread*)me;
309         current = (struct thread*)me;
310
311         /* loop until stopped */
312         me->events = NULL;
313         while (!me->stop) {
314                 /* get a job */
315                 job = job_get(first_job);
316                 if (job) {
317                         /* prepare running the job */
318                         remains++; /* increases count of job that can wait */
319                         job->blocked = 1; /* mark job as blocked */
320                         me->job = job; /* record the job (only for terminate) */
321
322                         /* run the job */
323                         pthread_mutex_unlock(&mutex);
324                         sig_monitor(job->timeout, job->callback, job->arg);
325                         pthread_mutex_lock(&mutex);
326
327                         /* release the run job */
328                         job_release(job);
329
330                         /* release event if any */
331                         events = me->events;
332                         if (events) {
333                                 events->runs = 0;
334                                 me->events = NULL;
335                         }
336                 } else {
337                         /* no job, check events */
338                         events = events_get();
339                         if (events) {
340                                 /* run the events */
341                                 events->runs = 1;
342                                 events->timeout = evto;
343                                 me->events = events;
344                                 pthread_mutex_unlock(&mutex);
345                                 sig_monitor(0, events_call, events);
346                                 pthread_mutex_lock(&mutex);
347                                 events->runs = 0;
348                                 me->events = NULL;
349                         } else {
350                                 /* no job and not events */
351                                 waiting++;
352                                 me->waits = 1;
353                                 pthread_cond_wait(&cond, &mutex);
354                                 me->waits = 0;
355                                 waiting--;
356                         }
357                 }
358         }
359
360         /* unlink the current thread and cleanup */
361         prv = &threads;
362         while (*prv != me)
363                 prv = &(*prv)->next;
364         *prv = me->next;
365         current = me->upper;
366         if (current) {
367                 current->lowered = 0;
368         } else {
369                 sig_monitor_clean_timeouts();
370                 started--;
371         }
372 }
373
374 /**
375  * Entry point for created threads.
376  * @param data not used
377  * @return NULL
378  */
379 static void *thread_main(void *data)
380 {
381         struct thread me;
382
383         pthread_mutex_lock(&mutex);
384         thread_run(&me);
385         pthread_mutex_unlock(&mutex);
386         return NULL;
387 }
388
389 /**
390  * Starts a new thread
391  * @return 0 in case of success or -1 in case of error
392  */
393 static int start_one_thread()
394 {
395         pthread_t tid;
396         int rc;
397
398         rc = pthread_create(&tid, NULL, thread_main, NULL);
399         if (rc != 0) {
400                 /* errno = rc; */
401                 WARNING("not able to start thread: %m");
402                 rc = -1;
403         }
404         return rc;
405 }
406
407 /**
408  * Queues a new asynchronous job represented by 'callback' and 'arg'
409  * for the 'group' and the 'timeout'.
410  * Jobs are queued FIFO and are possibly executed in parallel
411  * concurrently except for job of the same group that are
412  * executed sequentially in FIFO order.
413  * @param group    The group of the job or NULL when no group.
414  * @param timeout  The maximum execution time in seconds of the job
415  *                 or 0 for unlimited time.
416  * @param callback The function to execute for achieving the job.
417  *                 Its first parameter is either 0 on normal flow
418  *                 or the signal number that broke the normal flow.
419  *                 The remaining parameter is the parameter 'arg1'
420  *                 given here.
421  * @param arg      The second argument for 'callback'
422  * @return 0 in case of success or -1 in case of error
423  */
424 int jobs_queue(
425                 void *group,
426                 int timeout,
427                 void (*callback)(int, void*),
428                 void *arg)
429 {
430         const char *info;
431         struct job *job;
432         int rc;
433
434         pthread_mutex_lock(&mutex);
435
436         /* allocates the job */
437         job = job_create(group, timeout, callback, arg);
438         if (!job) {
439                 errno = ENOMEM;
440                 info = "out of memory";
441                 goto error;
442         }
443
444         /* check availability */
445         if (remains == 0) {
446                 errno = EBUSY;
447                 info = "too many jobs";
448                 goto error2;
449         }
450
451         /* start a thread if needed */
452         if (waiting == 0 && started < allowed) {
453                 /* all threads are busy and a new can be started */
454                 rc = start_one_thread();
455                 if (rc < 0 && started == 0) {
456                         info = "can't start first thread";
457                         goto error2;
458                 }
459         }
460
461         /* queues the job */
462         remains--;
463         job_add(job);
464
465         /* signal an existing job */
466         pthread_cond_signal(&cond);
467         pthread_mutex_unlock(&mutex);
468         return 0;
469
470 error2:
471         job->next = free_jobs;
472         free_jobs = job;
473 error:
474         ERROR("can't process job with threads: %s, %m", info);
475         pthread_mutex_unlock(&mutex);
476         return -1;
477 }
478
479 /**
480  * Internal helper function for 'jobs_enter'.
481  * @see jobs_enter, jobs_leave
482  */
483 static void enter_cb(int signum, void *closure)
484 {
485         struct sync *sync = closure;
486         sync->enter(signum, sync->arg, (void*)&sync->thread);
487 }
488
489 /**
490  * Internal helper function for 'jobs_call'.
491  * @see jobs_call
492  */
493 static void call_cb(int signum, void *closure)
494 {
495         struct sync *sync = closure;
496         sync->callback(signum, sync->arg);
497         jobs_leave((void*)&sync->thread);
498 }
499
500 /**
501  * Internal helper for synchronous jobs. It enters
502  * a new thread loop for evaluating the given job
503  * as recorded by the couple 'sync_cb' and 'sync'.
504  * @see jobs_call, jobs_enter, jobs_leave
505  */
506 static int do_sync(
507                 void *group,
508                 int timeout,
509                 void (*sync_cb)(int signum, void *closure),
510                 struct sync *sync
511 )
512 {
513         struct job *job;
514
515         pthread_mutex_lock(&mutex);
516
517         /* allocates the job */
518         job = job_create(group, timeout, sync_cb, sync);
519         if (!job) {
520                 ERROR("out of memory");
521                 errno = ENOMEM;
522                 pthread_mutex_unlock(&mutex);
523                 return -1;
524         }
525
526         /* queues the job */
527         job_add(job);
528
529         /* run until stopped */
530         thread_run(&sync->thread);
531         pthread_mutex_unlock(&mutex);
532         return 0;
533 }
534
535 /**
536  * Enter a synchronisation point: activates the job given by 'callback'
537  * and 'closure' using 'group' and 'timeout' to control sequencing and
538  * execution time.
539  * @param group the group for sequencing jobs
540  * @param timeout the time in seconds allocated to the job
541  * @param callback the callback that will handle the job.
542  *                 it receives 3 parameters: 'signum' that will be 0
543  *                 on normal flow or the catched signal number in case
544  *                 of interrupted flow, the context 'closure' as given and
545  *                 a 'jobloop' reference that must be used when the job is
546  *                 terminated to unlock the current execution flow.
547  * @param arg the argument to the callback
548  * @return 0 on success or -1 in case of error
549  */
550 int jobs_enter(
551                 void *group,
552                 int timeout,
553                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
554                 void *closure
555 )
556 {
557         struct sync sync;
558
559         sync.enter = callback;
560         sync.arg = closure;
561         return do_sync(group, timeout, enter_cb, &sync);
562 }
563
564 /**
565  * Unlocks the execution flow designed by 'jobloop'.
566  * @param jobloop indication of the flow to unlock
567  * @return 0 in case of success of -1 on error
568  */
569 int jobs_leave(struct jobloop *jobloop)
570 {
571         struct thread *t;
572
573         pthread_mutex_lock(&mutex);
574         t = threads;
575         while (t && t != (struct thread*)jobloop)
576                 t = t->next;
577         if (!t) {
578                 errno = EINVAL;
579         } else {
580                 t->stop = 1;
581                 if (t->waits)
582                         pthread_cond_broadcast(&cond);
583         }
584         pthread_mutex_unlock(&mutex);
585         return -!t;
586 }
587
588 /**
589  * Calls synchronously the job represented by 'callback' and 'arg1'
590  * for the 'group' and the 'timeout' and waits for its completion.
591  * @param group    The group of the job or NULL when no group.
592  * @param timeout  The maximum execution time in seconds of the job
593  *                 or 0 for unlimited time.
594  * @param callback The function to execute for achieving the job.
595  *                 Its first parameter is either 0 on normal flow
596  *                 or the signal number that broke the normal flow.
597  *                 The remaining parameter is the parameter 'arg1'
598  *                 given here.
599  * @param arg      The second argument for 'callback'
600  * @return 0 in case of success or -1 in case of error
601  */
602 int jobs_call(
603                 void *group,
604                 int timeout,
605                 void (*callback)(int, void*),
606                 void *arg)
607 {
608         struct sync sync;
609
610         sync.callback = callback;
611         sync.arg = arg;
612
613         return do_sync(group, timeout, call_cb, &sync);
614 }
615
616 /**
617  * Gets a sd_event item for the current thread.
618  * @return a sd_event or NULL in case of error
619  */
620 struct sd_event *jobs_get_sd_event()
621 {
622         struct events *events;
623         struct thread *me;
624         int rc;
625
626         pthread_mutex_lock(&mutex);
627
628         /* search events on stack */
629         me = current;
630         while (me && !me->events)
631                 me = me->upper;
632         if (me)
633                 /* return the stacked events */
634                 events = me->events;
635         else {
636                 /* search an available events */
637                 events = events_get();
638                 if (!events) {
639                         /* not found, check if creation possible */
640                         if (nevents >= allowed) {
641                                 ERROR("not possible to add a new event");
642                                 events = NULL;
643                         } else {
644                                 events = malloc(sizeof *events);
645                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
646                                         if (nevents < started || start_one_thread() >= 0) {
647                                                 events->runs = 0;
648                                                 events->next = first_events;
649                                                 first_events = events;
650                                         } else {
651                                                 ERROR("can't start thread for events");
652                                                 sd_event_unref(events->event);
653                                                 free(events);
654                                                 events = NULL;
655                                         }
656                                 } else {
657                                         if (!events) {
658                                                 ERROR("out of memory");
659                                                 errno = ENOMEM;
660                                         } else {
661                                                 free(events);
662                                                 ERROR("creation of sd_event failed: %m");
663                                                 events = NULL;
664                                                 errno = -rc;
665                                         } 
666                                 }
667                         }
668                 }
669                 if (events) {
670                         /* */
671                         me = current;
672                         if (me) {
673                                 events->runs = 1;
674                                 me->events = events;
675                         } else {
676                                 WARNING("event returned for unknown thread!");
677                         }
678                 }
679         }
680         pthread_mutex_unlock(&mutex);
681         return events ? events->event : NULL;
682 }
683
684 /**
685  * Enter the jobs processing loop.
686  * @param allowed_count Maximum count of thread for jobs including this one
687  * @param start_count   Count of thread to start now, must be lower.
688  * @param waiter_count  Maximum count of jobs that can be waiting.
689  * @param start         The start routine to activate (can't be NULL)
690  * @return 0 in case of success or -1 in case of error.
691  */
692 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
693 {
694         int rc, launched;
695         struct thread me;
696         struct job *job;
697
698         assert(allowed_count >= 1);
699         assert(start_count >= 0);
700         assert(waiter_count > 0);
701         assert(start_count <= allowed_count);
702
703         rc = -1;
704         pthread_mutex_lock(&mutex);
705
706         /* check whether already running */
707         if (current || allowed) {
708                 ERROR("thread already started");
709                 errno = EINVAL;
710                 goto error;
711         }
712
713         /* start */
714         if (sig_monitor_init() < 0) {
715                 ERROR("failed to initialise signal handlers");
716                 goto error;
717         }
718
719         /* records the allowed count */
720         allowed = allowed_count;
721         started = 0;
722         waiting = 0;
723         remains = waiter_count;
724
725         /* start at least one thread */
726         launched = 0;
727         while ((launched + 1) < start_count) {
728                 if (start_one_thread() != 0) {
729                         ERROR("Not all threads can be started");
730                         goto error;
731                 }
732                 launched++;
733         }
734
735         /* queue the start job */
736         job = job_create(NULL, 0, (job_cb_t)start, NULL);
737         if (!job) {
738                 ERROR("out of memory");
739                 errno = ENOMEM;
740                 goto error;
741         }
742         job_add(job);
743         remains--;
744
745         /* run until end */
746         thread_run(&me);
747         rc = 0;
748 error:
749         pthread_mutex_unlock(&mutex);
750         return rc;
751 }
752
753 /**
754  * Terminate all the threads and cancel all pending jobs.
755  */
756 void jobs_terminate()
757 {
758         struct job *job, *head, *tail;
759         pthread_t me, *others;
760         struct thread *t;
761         int count;
762
763         /* how am i? */
764         me = pthread_self();
765
766         /* request all threads to stop */
767         pthread_mutex_lock(&mutex);
768         allowed = 0;
769
770         /* count the number of threads */
771         count = 0;
772         t = threads;
773         while (t) {
774                 if (!t->upper && !pthread_equal(t->tid, me))
775                         count++;
776                 t = t->next;
777         }
778
779         /* fill the array of threads */
780         others = alloca(count * sizeof *others);
781         count = 0;
782         t = threads;
783         while (t) {
784                 if (!t->upper && !pthread_equal(t->tid, me))
785                         others[count++] = t->tid;
786                 t = t->next;
787         }
788
789         /* stops the threads */
790         t = threads;
791         while (t) {
792                 t->stop = 1;
793                 t = t->next;
794         }
795
796         /* wait the threads */
797         pthread_cond_broadcast(&cond);
798         pthread_mutex_unlock(&mutex);
799         while (count)
800                 pthread_join(others[--count], NULL);
801         pthread_mutex_lock(&mutex);
802
803         /* cancel pending jobs of other threads */
804         remains = 0;
805         head = first_job;
806         first_job = NULL;
807         tail = NULL;
808         while (head) {
809                 /* unlink the job */
810                 job = head;
811                 head = job->next;
812
813                 /* search if job is stacked for current */
814                 t = current;
815                 while (t && t->job != job)
816                         t = t->upper;
817                 if (t) {
818                         /* yes, relink it at end */
819                         if (tail)
820                                 tail->next = job;
821                         else
822                                 first_job = job;
823                         tail = job;
824                         job->next = NULL;
825                 } else {
826                         /* no cancel the job */
827                         pthread_mutex_unlock(&mutex);
828                         sig_monitor(0, job_cancel, job);
829                         free(job);
830                         pthread_mutex_lock(&mutex);
831                 }
832         }
833         pthread_mutex_unlock(&mutex);
834 }
835