jobs: Improve event loop integration
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         const void *group;   /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg;           /**< argument */
56         int timeout;         /**< timeout in second for processing the request */
57         unsigned blocked: 1; /**< is an other request blocking this one ? */
58         unsigned dropped: 1; /**< is removed ? */
59 };
60
61 /** Description of handled event loops */
62 struct events
63 {
64         struct events *next;
65         struct sd_event *event;
66         uint64_t timeout;
67         enum {
68                 Available,
69                 Modifiable,
70                 Locked
71         } state;
72 };
73
74 /** Description of threads */
75 struct thread
76 {
77         struct thread *next;   /**< next thread of the list */
78         struct thread *upper;  /**< upper same thread */
79         struct job *job;       /**< currently processed job */
80         pthread_t tid;         /**< the thread id */
81         unsigned stop: 1;      /**< stop requested */
82         unsigned waits: 1;     /**< is waiting? */
83 };
84
85 /**
86  * Description of synchonous callback
87  */
88 struct sync
89 {
90         struct thread thread;   /**< thread loop data */
91         union {
92                 void (*callback)(int, void*);   /**< the synchronous callback */
93                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
94                                 /**< the entering synchronous routine */
95         };
96         void *arg;              /**< the argument of the callback */
97 };
98
99
100 /* synchronisation of threads */
101 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
103
104 /* count allowed, started and waiting threads */
105 static int allowed = 0; /** allowed count of threads */
106 static int started = 0; /** started count of threads */
107 static int waiting = 0; /** waiting count of threads */
108 static int remains = 0; /** allowed count of waiting jobs */
109 static int nevents = 0; /** count of events */
110
111 /* list of threads */
112 static struct thread *threads;
113 static _Thread_local struct thread *current_thread;
114 static _Thread_local struct events *current_events;
115
116 /* queue of pending jobs */
117 static struct job *first_job;
118 static struct events *first_events;
119 static struct job *free_jobs;
120
121 /**
122  * Create a new job with the given parameters
123  * @param group    the group of the job
124  * @param timeout  the timeout of the job (0 if none)
125  * @param callback the function that achieves the job
126  * @param arg      the argument of the callback
127  * @return the created job unblock or NULL when no more memory
128  */
129 static struct job *job_create(
130                 const void *group,
131                 int timeout,
132                 job_cb_t callback,
133                 void *arg)
134 {
135         struct job *job;
136
137         /* try recyle existing job */
138         job = free_jobs;
139         if (job)
140                 free_jobs = job->next;
141         else {
142                 /* allocation  without blocking */
143                 pthread_mutex_unlock(&mutex);
144                 job = malloc(sizeof *job);
145                 pthread_mutex_lock(&mutex);
146                 if (!job) {
147                         errno = -ENOMEM;
148                         goto end;
149                 }
150         }
151         /* initialises the job */
152         job->group = group;
153         job->timeout = timeout;
154         job->callback = callback;
155         job->arg = arg;
156         job->blocked = 0;
157         job->dropped = 0;
158 end:
159         return job;
160 }
161
162 /**
163  * Adds 'job' at the end of the list of jobs, marking it
164  * as blocked if an other job with the same group is pending.
165  * @param job the job to add
166  */
167 static void job_add(struct job *job)
168 {
169         const void *group;
170         struct job *ijob, **pjob;
171
172         /* prepare to add */
173         group = job->group;
174         job->next = NULL;
175
176         /* search end and blockers */
177         pjob = &first_job;
178         ijob = first_job;
179         while (ijob) {
180                 if (group && ijob->group == group)
181                         job->blocked = 1;
182                 pjob = &ijob->next;
183                 ijob = ijob->next;
184         }
185
186         /* queue the jobs */
187         *pjob = job;
188 }
189
190 /**
191  * Get the next job to process or NULL if none.
192  * @return the first job that isn't blocked or NULL
193  */
194 static inline struct job *job_get()
195 {
196         struct job *job = first_job;
197         while (job && job->blocked)
198                 job = job->next;
199         return job;
200 }
201
202 /**
203  * Get the next events to process or NULL if none.
204  * @return the first events that isn't running or NULL
205  */
206 static inline struct events *events_get()
207 {
208         struct events *events = first_events;
209         while (events && events->state != Available)
210                 events = events->next;
211         return events;
212 }
213
214 /**
215  * Releases the processed 'job': removes it
216  * from the list of jobs and unblock the first
217  * pending job of the same group if any.
218  * @param job the job to release
219  */
220 static inline void job_release(struct job *job)
221 {
222         struct job *ijob, **pjob;
223         const void *group;
224
225         /* first unqueue the job */
226         pjob = &first_job;
227         ijob = first_job;
228         while (ijob != job) {
229                 pjob = &ijob->next;
230                 ijob = ijob->next;
231         }
232         *pjob = job->next;
233
234         /* then unblock jobs of the same group */
235         group = job->group;
236         if (group) {
237                 ijob = job->next;
238                 while (ijob && ijob->group != group)
239                         ijob = ijob->next;
240                 if (ijob)
241                         ijob->blocked = 0;
242         }
243
244         /* recycle the job */
245         job->next = free_jobs;
246         free_jobs = job;
247 }
248
249 /**
250  * Monitored cancel callback for a job.
251  * This function is called by the monitor
252  * to cancel the job when the safe environment
253  * is set.
254  * @param signum 0 on normal flow or the number
255  *               of the signal that interrupted the normal
256  *               flow, isn't used
257  * @param arg    the job to run
258  */
259 static void job_cancel(int signum, void *arg)
260 {
261         struct job *job = arg;
262         job->callback(SIGABRT, job->arg);
263 }
264
265 /**
266  * Monitored normal callback for events.
267  * This function is called by the monitor
268  * to run the event loop when the safe environment
269  * is set.
270  * @param signum 0 on normal flow or the number
271  *               of the signal that interrupted the normal
272  *               flow
273  * @param arg     the events to run
274  */
275 static void events_call(int signum, void *arg)
276 {
277         int rc;
278         struct sd_event *se;
279         struct events *events = arg;
280
281         if (!signum) {
282                 se = events->event;
283                 rc = sd_event_prepare(se);
284                 if (rc < 0) {
285                         errno = -rc;
286                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
287                 } else {
288                         if (rc == 0) {
289                                 rc = sd_event_wait(se, events->timeout);
290                                 if (rc < 0) {
291                                         errno = -rc;
292                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
293                                 }
294                         }
295
296                         if (rc > 0) {
297                                 rc = sd_event_dispatch(se);
298                                 if (rc < 0) {
299                                         errno = -rc;
300                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
301                                 }
302                         }
303                 }
304         }
305 }
306
307 /**
308  * Main processing loop of threads processing jobs.
309  * The loop must be called with the mutex locked
310  * and it returns with the mutex locked.
311  * @param me the description of the thread to use
312  * TODO: how are timeout handled when reentering?
313  */
314 static void thread_run(volatile struct thread *me)
315 {
316         struct thread **prv;
317         struct job *job;
318         struct events *events;
319         uint64_t evto;
320
321         /* initialize description of itself and link it in the list */
322         me->tid = pthread_self();
323         me->stop = 0;
324         me->waits = 0;
325         me->upper = current_thread;
326         if (current_thread) {
327                 evto = EVENT_TIMEOUT_CHILD;
328         } else {
329                 started++;
330                 sig_monitor_init_timeouts();
331                 evto = EVENT_TIMEOUT_TOP;
332         }
333         me->next = threads;
334         threads = (struct thread*)me;
335         current_thread = (struct thread*)me;
336
337         /* loop until stopped */
338         while (!me->stop) {
339                 /* get a job */
340                 job = job_get(first_job);
341                 if (job) {
342                         /* prepare running the job */
343                         remains++; /* increases count of job that can wait */
344                         job->blocked = 1; /* mark job as blocked */
345                         me->job = job; /* record the job (only for terminate) */
346
347                         /* run the job */
348                         pthread_mutex_unlock(&mutex);
349                         sig_monitor(job->timeout, job->callback, job->arg);
350                         pthread_mutex_lock(&mutex);
351
352                         /* release event if any */
353                         events = current_events;
354                         if (events && events->state == Modifiable) {
355                                 current_events = NULL;
356                                 events->state = Available;
357                         }
358
359                         /* release the run job */
360                         job_release(job);
361                 } else {
362                         /* no job, check events */
363                         events = current_events;
364                         if (!events)
365                                 events = events_get();
366                         else if (events->state == Locked)
367                                 events = 0;
368                         if (events) {
369                                 /* run the events */
370                                 events->state = Locked;
371                                 events->timeout = evto;
372                                 current_events = events;
373                                 pthread_mutex_unlock(&mutex);
374                                 sig_monitor(0, events_call, events);
375                                 pthread_mutex_lock(&mutex);
376                                 current_events = NULL;
377                                 events->state = Available;
378                         } else {
379                                 /* no job and not events */
380                                 waiting++;
381                                 me->waits = 1;
382                                 pthread_cond_wait(&cond, &mutex);
383                                 me->waits = 0;
384                                 waiting--;
385                         }
386                 }
387         }
388
389         /* unlink the current thread and cleanup */
390         prv = &threads;
391         while (*prv != me)
392                 prv = &(*prv)->next;
393         *prv = me->next;
394         current_thread = me->upper;
395         if (!current_thread) {
396                 sig_monitor_clean_timeouts();
397                 started--;
398         }
399 }
400
401 /**
402  * Entry point for created threads.
403  * @param data not used
404  * @return NULL
405  */
406 static void *thread_main(void *data)
407 {
408         struct thread me;
409
410         pthread_mutex_lock(&mutex);
411         thread_run(&me);
412         pthread_mutex_unlock(&mutex);
413         return NULL;
414 }
415
416 /**
417  * Starts a new thread
418  * @return 0 in case of success or -1 in case of error
419  */
420 static int start_one_thread()
421 {
422         pthread_t tid;
423         int rc;
424
425         rc = pthread_create(&tid, NULL, thread_main, NULL);
426         if (rc != 0) {
427                 /* errno = rc; */
428                 WARNING("not able to start thread: %m");
429                 rc = -1;
430         }
431         return rc;
432 }
433
434 /**
435  * Queues a new asynchronous job represented by 'callback' and 'arg'
436  * for the 'group' and the 'timeout'.
437  * Jobs are queued FIFO and are possibly executed in parallel
438  * concurrently except for job of the same group that are
439  * executed sequentially in FIFO order.
440  * @param group    The group of the job or NULL when no group.
441  * @param timeout  The maximum execution time in seconds of the job
442  *                 or 0 for unlimited time.
443  * @param callback The function to execute for achieving the job.
444  *                 Its first parameter is either 0 on normal flow
445  *                 or the signal number that broke the normal flow.
446  *                 The remaining parameter is the parameter 'arg1'
447  *                 given here.
448  * @param arg      The second argument for 'callback'
449  * @return 0 in case of success or -1 in case of error
450  */
451 int jobs_queue(
452                 const void *group,
453                 int timeout,
454                 void (*callback)(int, void*),
455                 void *arg)
456 {
457         const char *info;
458         struct job *job;
459         int rc;
460
461         pthread_mutex_lock(&mutex);
462
463         /* allocates the job */
464         job = job_create(group, timeout, callback, arg);
465         if (!job) {
466                 errno = ENOMEM;
467                 info = "out of memory";
468                 goto error;
469         }
470
471         /* check availability */
472         if (remains == 0) {
473                 errno = EBUSY;
474                 info = "too many jobs";
475                 goto error2;
476         }
477
478         /* start a thread if needed */
479         if (waiting == 0 && started < allowed) {
480                 /* all threads are busy and a new can be started */
481                 rc = start_one_thread();
482                 if (rc < 0 && started == 0) {
483                         info = "can't start first thread";
484                         goto error2;
485                 }
486         }
487
488         /* queues the job */
489         remains--;
490         job_add(job);
491
492         /* signal an existing job */
493         pthread_cond_signal(&cond);
494         pthread_mutex_unlock(&mutex);
495         return 0;
496
497 error2:
498         job->next = free_jobs;
499         free_jobs = job;
500 error:
501         ERROR("can't process job with threads: %s, %m", info);
502         pthread_mutex_unlock(&mutex);
503         return -1;
504 }
505
506 /**
507  * Internal helper function for 'jobs_enter'.
508  * @see jobs_enter, jobs_leave
509  */
510 static void enter_cb(int signum, void *closure)
511 {
512         struct sync *sync = closure;
513         sync->enter(signum, sync->arg, (void*)&sync->thread);
514 }
515
516 /**
517  * Internal helper function for 'jobs_call'.
518  * @see jobs_call
519  */
520 static void call_cb(int signum, void *closure)
521 {
522         struct sync *sync = closure;
523         sync->callback(signum, sync->arg);
524         jobs_leave((void*)&sync->thread);
525 }
526
527 /**
528  * Internal helper for synchronous jobs. It enters
529  * a new thread loop for evaluating the given job
530  * as recorded by the couple 'sync_cb' and 'sync'.
531  * @see jobs_call, jobs_enter, jobs_leave
532  */
533 static int do_sync(
534                 const void *group,
535                 int timeout,
536                 void (*sync_cb)(int signum, void *closure),
537                 struct sync *sync
538 )
539 {
540         struct job *job;
541
542         pthread_mutex_lock(&mutex);
543
544         /* allocates the job */
545         job = job_create(group, timeout, sync_cb, sync);
546         if (!job) {
547                 ERROR("out of memory");
548                 errno = ENOMEM;
549                 pthread_mutex_unlock(&mutex);
550                 return -1;
551         }
552
553         /* queues the job */
554         job_add(job);
555
556         /* run until stopped */
557         thread_run(&sync->thread);
558         pthread_mutex_unlock(&mutex);
559         return 0;
560 }
561
562 /**
563  * Enter a synchronisation point: activates the job given by 'callback'
564  * and 'closure' using 'group' and 'timeout' to control sequencing and
565  * execution time.
566  * @param group the group for sequencing jobs
567  * @param timeout the time in seconds allocated to the job
568  * @param callback the callback that will handle the job.
569  *                 it receives 3 parameters: 'signum' that will be 0
570  *                 on normal flow or the catched signal number in case
571  *                 of interrupted flow, the context 'closure' as given and
572  *                 a 'jobloop' reference that must be used when the job is
573  *                 terminated to unlock the current execution flow.
574  * @param arg the argument to the callback
575  * @return 0 on success or -1 in case of error
576  */
577 int jobs_enter(
578                 const void *group,
579                 int timeout,
580                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
581                 void *closure
582 )
583 {
584         struct sync sync;
585
586         sync.enter = callback;
587         sync.arg = closure;
588         return do_sync(group, timeout, enter_cb, &sync);
589 }
590
591 /**
592  * Unlocks the execution flow designed by 'jobloop'.
593  * @param jobloop indication of the flow to unlock
594  * @return 0 in case of success of -1 on error
595  */
596 int jobs_leave(struct jobloop *jobloop)
597 {
598         struct thread *t;
599
600         pthread_mutex_lock(&mutex);
601         t = threads;
602         while (t && t != (struct thread*)jobloop)
603                 t = t->next;
604         if (!t) {
605                 errno = EINVAL;
606         } else {
607                 t->stop = 1;
608                 if (t->waits)
609                         pthread_cond_broadcast(&cond);
610         }
611         pthread_mutex_unlock(&mutex);
612         return -!t;
613 }
614
615 /**
616  * Calls synchronously the job represented by 'callback' and 'arg1'
617  * for the 'group' and the 'timeout' and waits for its completion.
618  * @param group    The group of the job or NULL when no group.
619  * @param timeout  The maximum execution time in seconds of the job
620  *                 or 0 for unlimited time.
621  * @param callback The function to execute for achieving the job.
622  *                 Its first parameter is either 0 on normal flow
623  *                 or the signal number that broke the normal flow.
624  *                 The remaining parameter is the parameter 'arg1'
625  *                 given here.
626  * @param arg      The second argument for 'callback'
627  * @return 0 in case of success or -1 in case of error
628  */
629 int jobs_call(
630                 const void *group,
631                 int timeout,
632                 void (*callback)(int, void*),
633                 void *arg)
634 {
635         struct sync sync;
636
637         sync.callback = callback;
638         sync.arg = arg;
639
640         return do_sync(group, timeout, call_cb, &sync);
641 }
642
643 /**
644  * Gets a sd_event item for the current thread.
645  * @return a sd_event or NULL in case of error
646  */
647 struct sd_event *jobs_get_sd_event()
648 {
649         struct events *events;
650         int rc;
651
652         pthread_mutex_lock(&mutex);
653
654         /* search events on stack */
655         events = current_events;
656         if (!events) {
657                 /* search an available events */
658                 events = events_get();
659                 if (!events) {
660                         /* not found, check if creation possible */
661                         if (nevents >= allowed) {
662                                 ERROR("not possible to add a new event");
663                                 events = NULL;
664                         } else {
665                                 events = malloc(sizeof *events);
666                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
667                                         if (nevents < started || start_one_thread() >= 0) {
668                                                 events->state = Available;
669                                                 events->next = first_events;
670                                                 first_events = events;
671                                         } else {
672                                                 ERROR("can't start thread for events");
673                                                 sd_event_unref(events->event);
674                                                 free(events);
675                                                 events = NULL;
676                                         }
677                                 } else {
678                                         if (!events) {
679                                                 ERROR("out of memory");
680                                                 errno = ENOMEM;
681                                         } else {
682                                                 free(events);
683                                                 ERROR("creation of sd_event failed: %m");
684                                                 events = NULL;
685                                                 errno = -rc;
686                                         }
687                                 }
688                         }
689                 }
690                 if (events) {
691                         events->state = Modifiable;
692                         if (!current_thread)
693                                 WARNING("event returned for unknown thread!");
694                         current_events = events;
695                 }
696         }
697         pthread_mutex_unlock(&mutex);
698         return events ? events->event : NULL;
699 }
700
701 /**
702  * Enter the jobs processing loop.
703  * @param allowed_count Maximum count of thread for jobs including this one
704  * @param start_count   Count of thread to start now, must be lower.
705  * @param waiter_count  Maximum count of jobs that can be waiting.
706  * @param start         The start routine to activate (can't be NULL)
707  * @return 0 in case of success or -1 in case of error.
708  */
709 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
710 {
711         int rc, launched;
712         struct thread me;
713         struct job *job;
714
715         assert(allowed_count >= 1);
716         assert(start_count >= 0);
717         assert(waiter_count > 0);
718         assert(start_count <= allowed_count);
719
720         rc = -1;
721         pthread_mutex_lock(&mutex);
722
723         /* check whether already running */
724         if (current_thread || allowed) {
725                 ERROR("thread already started");
726                 errno = EINVAL;
727                 goto error;
728         }
729
730         /* start */
731         if (sig_monitor_init() < 0) {
732                 ERROR("failed to initialise signal handlers");
733                 goto error;
734         }
735
736         /* records the allowed count */
737         allowed = allowed_count;
738         started = 0;
739         waiting = 0;
740         remains = waiter_count;
741
742         /* start at least one thread */
743         launched = 0;
744         while ((launched + 1) < start_count) {
745                 if (start_one_thread() != 0) {
746                         ERROR("Not all threads can be started");
747                         goto error;
748                 }
749                 launched++;
750         }
751
752         /* queue the start job */
753         job = job_create(NULL, 0, (job_cb_t)start, NULL);
754         if (!job) {
755                 ERROR("out of memory");
756                 errno = ENOMEM;
757                 goto error;
758         }
759         job_add(job);
760         remains--;
761
762         /* run until end */
763         thread_run(&me);
764         rc = 0;
765 error:
766         pthread_mutex_unlock(&mutex);
767         return rc;
768 }
769
770 /**
771  * Terminate all the threads and cancel all pending jobs.
772  */
773 void jobs_terminate()
774 {
775         struct job *job, *head, *tail;
776         pthread_t me, *others;
777         struct thread *t;
778         int count;
779
780         /* how am i? */
781         me = pthread_self();
782
783         /* request all threads to stop */
784         pthread_mutex_lock(&mutex);
785         allowed = 0;
786
787         /* count the number of threads */
788         count = 0;
789         t = threads;
790         while (t) {
791                 if (!t->upper && !pthread_equal(t->tid, me))
792                         count++;
793                 t = t->next;
794         }
795
796         /* fill the array of threads */
797         others = alloca(count * sizeof *others);
798         count = 0;
799         t = threads;
800         while (t) {
801                 if (!t->upper && !pthread_equal(t->tid, me))
802                         others[count++] = t->tid;
803                 t = t->next;
804         }
805
806         /* stops the threads */
807         t = threads;
808         while (t) {
809                 t->stop = 1;
810                 t = t->next;
811         }
812
813         /* wait the threads */
814         pthread_cond_broadcast(&cond);
815         pthread_mutex_unlock(&mutex);
816         while (count)
817                 pthread_join(others[--count], NULL);
818         pthread_mutex_lock(&mutex);
819
820         /* cancel pending jobs of other threads */
821         remains = 0;
822         head = first_job;
823         first_job = NULL;
824         tail = NULL;
825         while (head) {
826                 /* unlink the job */
827                 job = head;
828                 head = job->next;
829
830                 /* search if job is stacked for current */
831                 t = current_thread;
832                 while (t && t->job != job)
833                         t = t->upper;
834                 if (t) {
835                         /* yes, relink it at end */
836                         if (tail)
837                                 tail->next = job;
838                         else
839                                 first_job = job;
840                         tail = job;
841                         job->next = NULL;
842                 } else {
843                         /* no cancel the job */
844                         pthread_mutex_unlock(&mutex);
845                         sig_monitor(0, job_cancel, job);
846                         free(job);
847                         pthread_mutex_lock(&mutex);
848                 }
849         }
850         pthread_mutex_unlock(&mutex);
851 }
852