Allow to remove systemd library
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "evmgr.h"
31 #include "sig-monitor.h"
32 #include "verbose.h"
33
34 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
35 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
36
37 struct thread;
38
39 /** Internal shortcut for callback */
40 typedef void (*job_cb_t)(int, void*);
41
42 /** Description of a pending job */
43 struct job
44 {
45         struct job *next;    /**< link to the next job enqueued */
46         const void *group;   /**< group of the request */
47         job_cb_t callback;   /**< processing callback */
48         void *arg;           /**< argument */
49         int timeout;         /**< timeout in second for processing the request */
50         unsigned blocked: 1; /**< is an other request blocking this one ? */
51         unsigned dropped: 1; /**< is removed ? */
52 };
53
54 /** Description of threads */
55 struct thread
56 {
57         struct thread *next;   /**< next thread of the list */
58         struct thread *upper;  /**< upper same thread */
59         struct thread *nholder;/**< next holder for evloop */
60         pthread_cond_t *cwhold;/**< condition wait for holding */
61         struct job *job;       /**< currently processed job */
62         pthread_t tid;         /**< the thread id */
63         volatile unsigned stop: 1;      /**< stop requested */
64         volatile unsigned waits: 1;     /**< is waiting? */
65         volatile unsigned leaved: 1;    /**< was leaved? */
66 };
67
68 /**
69  * Description of synchronous callback
70  */
71 struct sync
72 {
73         struct thread thread;   /**< thread loop data */
74         union {
75                 void (*callback)(int, void*);   /**< the synchronous callback */
76                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
77                                 /**< the entering synchronous routine */
78         };
79         void *arg;              /**< the argument of the callback */
80 };
81
82 /* synchronisation of threads */
83 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
84 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
85
86 /* count allowed, started and running threads */
87 static int allowed = 0; /** allowed count of threads */
88 static int started = 0; /** started count of threads */
89 static int running = 0; /** running count of threads */
90 static int remains = 0; /** allowed count of waiting jobs */
91
92 /* list of threads */
93 static struct thread *threads;
94 static _Thread_local struct thread *current_thread;
95
96 /* queue of pending jobs */
97 static struct job *first_job;
98 static struct job *free_jobs;
99
100 /* event loop */
101 static struct evmgr *evmgr;
102
103 static void (*exit_handler)();
104
105 /**
106  * Create a new job with the given parameters
107  * @param group    the group of the job
108  * @param timeout  the timeout of the job (0 if none)
109  * @param callback the function that achieves the job
110  * @param arg      the argument of the callback
111  * @return the created job unblock or NULL when no more memory
112  */
113 static struct job *job_create(
114                 const void *group,
115                 int timeout,
116                 job_cb_t callback,
117                 void *arg)
118 {
119         struct job *job;
120
121         /* try recyle existing job */
122         job = free_jobs;
123         if (job)
124                 free_jobs = job->next;
125         else {
126                 /* allocation without blocking */
127                 pthread_mutex_unlock(&mutex);
128                 job = malloc(sizeof *job);
129                 pthread_mutex_lock(&mutex);
130                 if (!job) {
131                         ERROR("out of memory");
132                         errno = ENOMEM;
133                         goto end;
134                 }
135         }
136         /* initialises the job */
137         job->group = group;
138         job->timeout = timeout;
139         job->callback = callback;
140         job->arg = arg;
141         job->blocked = 0;
142         job->dropped = 0;
143 end:
144         return job;
145 }
146
147 /**
148  * Adds 'job' at the end of the list of jobs, marking it
149  * as blocked if an other job with the same group is pending.
150  * @param job the job to add
151  */
152 static void job_add(struct job *job)
153 {
154         const void *group;
155         struct job *ijob, **pjob;
156
157         /* prepare to add */
158         group = job->group;
159         job->next = NULL;
160
161         /* search end and blockers */
162         pjob = &first_job;
163         ijob = first_job;
164         while (ijob) {
165                 if (group && ijob->group == group)
166                         job->blocked = 1;
167                 pjob = &ijob->next;
168                 ijob = ijob->next;
169         }
170
171         /* queue the jobs */
172         *pjob = job;
173         remains--;
174 }
175
176 /**
177  * Get the next job to process or NULL if none.
178  * @return the first job that isn't blocked or NULL
179  */
180 static inline struct job *job_get()
181 {
182         struct job *job = first_job;
183         while (job && job->blocked)
184                 job = job->next;
185         if (job)
186                 remains++;
187         return job;
188 }
189
190 /**
191  * Releases the processed 'job': removes it
192  * from the list of jobs and unblock the first
193  * pending job of the same group if any.
194  * @param job the job to release
195  */
196 static inline void job_release(struct job *job)
197 {
198         struct job *ijob, **pjob;
199         const void *group;
200
201         /* first unqueue the job */
202         pjob = &first_job;
203         ijob = first_job;
204         while (ijob != job) {
205                 pjob = &ijob->next;
206                 ijob = ijob->next;
207         }
208         *pjob = job->next;
209
210         /* then unblock jobs of the same group */
211         group = job->group;
212         if (group) {
213                 ijob = job->next;
214                 while (ijob && ijob->group != group)
215                         ijob = ijob->next;
216                 if (ijob)
217                         ijob->blocked = 0;
218         }
219
220         /* recycle the job */
221         job->next = free_jobs;
222         free_jobs = job;
223 }
224
225 /**
226  * Monitored cancel callback for a job.
227  * This function is called by the monitor
228  * to cancel the job when the safe environment
229  * is set.
230  * @param signum 0 on normal flow or the number
231  *               of the signal that interrupted the normal
232  *               flow, isn't used
233  * @param arg    the job to run
234  */
235 __attribute__((unused))
236 static void job_cancel(int signum, void *arg)
237 {
238         struct job *job = arg;
239         job->callback(SIGABRT, job->arg);
240 }
241
242 /**
243  * wakeup the event loop if needed by sending
244  * an event.
245  */
246 static void evloop_wakeup()
247 {
248         if (evmgr)
249                 evmgr_wakeup(evmgr);
250 }
251
252 /**
253  * Release the currently held event loop
254  */
255 static void evloop_release()
256 {
257         struct thread *nh, *ct = current_thread;
258
259         if (ct && evmgr && evmgr_release_if(evmgr, ct)) {
260                 nh = ct->nholder;
261                 ct->nholder = 0;
262                 if (nh) {
263                         evmgr_try_hold(evmgr, nh);
264                         pthread_cond_signal(nh->cwhold);
265                 }
266         }
267 }
268
269 /**
270  * get the eventloop for the current thread
271  */
272 static int evloop_get()
273 {
274         return evmgr && evmgr_try_hold(evmgr, current_thread);
275 }
276
277 /**
278  * acquire the eventloop for the current thread
279  */
280 static void evloop_acquire()
281 {
282         struct thread *pwait, *ct;
283         pthread_cond_t cond;
284
285         /* try to get the evloop */
286         if (!evloop_get()) {
287                 /* failed, init waiting state */
288                 ct = current_thread;
289                 ct->nholder = NULL;
290                 ct->cwhold = &cond;
291                 pthread_cond_init(&cond, NULL);
292
293                 /* queue current thread in holder list */
294                 pwait = evmgr_holder(evmgr);
295                 while (pwait->nholder)
296                         pwait = pwait->nholder;
297                 pwait->nholder = ct;
298
299                 /* wake up the evloop */
300                 evloop_wakeup();
301
302                 /* wait to acquire the evloop */
303                 pthread_cond_wait(&cond, &mutex);
304                 pthread_cond_destroy(&cond);
305         }
306 }
307
308 /**
309  * Enter the thread
310  * @param me the description of the thread to enter
311  */
312 static void thread_enter(volatile struct thread *me)
313 {
314         evloop_release();
315         /* initialize description of itself and link it in the list */
316         me->tid = pthread_self();
317         me->stop = 0;
318         me->waits = 0;
319         me->leaved = 0;
320         me->nholder = 0;
321         me->upper = current_thread;
322         me->next = threads;
323         threads = (struct thread*)me;
324         current_thread = (struct thread*)me;
325 }
326
327 /**
328  * leave the thread
329  * @param me the description of the thread to leave
330  */
331 static void thread_leave()
332 {
333         struct thread **prv, *me;
334
335         /* unlink the current thread and cleanup */
336         me = current_thread;
337         prv = &threads;
338         while (*prv != me)
339                 prv = &(*prv)->next;
340         *prv = me->next;
341
342         current_thread = me->upper;
343 }
344
345 /**
346  * Main processing loop of internal threads with processing jobs.
347  * The loop must be called with the mutex locked
348  * and it returns with the mutex locked.
349  * @param me the description of the thread to use
350  * TODO: how are timeout handled when reentering?
351  */
352 static void thread_run_internal(volatile struct thread *me)
353 {
354         struct job *job;
355
356         /* enter thread */
357         thread_enter(me);
358
359         /* loop until stopped */
360         while (!me->stop) {
361                 /* release the current event loop */
362                 evloop_release();
363
364                 /* get a job */
365                 job = job_get();
366                 if (job) {
367                         /* prepare running the job */
368                         job->blocked = 1; /* mark job as blocked */
369                         me->job = job; /* record the job (only for terminate) */
370
371                         /* run the job */
372                         pthread_mutex_unlock(&mutex);
373                         sig_monitor(job->timeout, job->callback, job->arg);
374                         pthread_mutex_lock(&mutex);
375
376                         /* release the run job */
377                         job_release(job);
378                 /* no job, check event loop wait */
379                 } else if (evloop_get()) {
380                         if (!evmgr_can_run(evmgr)) {
381                                 /* busy ? */
382                                 CRITICAL("Can't enter dispatch while in dispatch!");
383                                 abort();
384                         }
385                         /* run the events */
386                         pthread_mutex_unlock(&mutex);
387                         sig_monitor(0, (void(*)(int,void*))evmgr_job_run, evmgr);
388                         pthread_mutex_lock(&mutex);
389                 } else {
390                         /* no job and no event loop */
391                         running--;
392                         if (!running)
393                                 ERROR("Entering job deep sleep! Check your bindings.");
394                         me->waits = 1;
395                         pthread_cond_wait(&cond, &mutex);
396                         me->waits = 0;
397                         running++;
398                 }
399         }
400         /* cleanup */
401         evloop_release();
402         thread_leave();
403 }
404
405 /**
406  * Main processing loop of external threads.
407  * The loop must be called with the mutex locked
408  * and it returns with the mutex locked.
409  * @param me the description of the thread to use
410  */
411 static void thread_run_external(volatile struct thread *me)
412 {
413         /* enter thread */
414         thread_enter(me);
415
416         /* loop until stopped */
417         me->waits = 1;
418         while (!me->stop)
419                 pthread_cond_wait(&cond, &mutex);
420         me->waits = 0;
421         thread_leave();
422 }
423
424 /**
425  * Root for created threads.
426  */
427 static void thread_main()
428 {
429         struct thread me;
430
431         running++;
432         started++;
433         sig_monitor_init_timeouts();
434         thread_run_internal(&me);
435         sig_monitor_clean_timeouts();
436         started--;
437         running--;
438 }
439
440 /**
441  * Entry point for created threads.
442  * @param data not used
443  * @return NULL
444  */
445 static void *thread_starter(void *data)
446 {
447         pthread_mutex_lock(&mutex);
448         thread_main();
449         pthread_mutex_unlock(&mutex);
450         return NULL;
451 }
452
453 /**
454  * Starts a new thread
455  * @return 0 in case of success or -1 in case of error
456  */
457 static int start_one_thread()
458 {
459         pthread_t tid;
460         int rc;
461
462         rc = pthread_create(&tid, NULL, thread_starter, NULL);
463         if (rc != 0) {
464                 /* errno = rc; */
465                 WARNING("not able to start thread: %m");
466                 rc = -1;
467         }
468         return rc;
469 }
470
471 /**
472  * Queues a new asynchronous job represented by 'callback' and 'arg'
473  * for the 'group' and the 'timeout'.
474  * Jobs are queued FIFO and are possibly executed in parallel
475  * concurrently except for job of the same group that are
476  * executed sequentially in FIFO order.
477  * @param group    The group of the job or NULL when no group.
478  * @param timeout  The maximum execution time in seconds of the job
479  *                 or 0 for unlimited time.
480  * @param callback The function to execute for achieving the job.
481  *                 Its first parameter is either 0 on normal flow
482  *                 or the signal number that broke the normal flow.
483  *                 The remaining parameter is the parameter 'arg1'
484  *                 given here.
485  * @param arg      The second argument for 'callback'
486  * @param start    Allow to start a thread if not zero
487  * @return 0 in case of success or -1 in case of error
488  */
489 static int queue_job(
490                 const void *group,
491                 int timeout,
492                 void (*callback)(int, void*),
493                 void *arg,
494                 int start)
495 {
496         struct job *job;
497         int rc;
498
499         pthread_mutex_lock(&mutex);
500
501         /* allocates the job */
502         job = job_create(group, timeout, callback, arg);
503         if (!job)
504                 goto error;
505
506         /* check availability */
507         if (remains <= 0) {
508                 ERROR("can't process job with threads: too many jobs");
509                 errno = EBUSY;
510                 goto error2;
511         }
512
513         /* start a thread if needed */
514         if (start && running == started && started < allowed) {
515                 /* all threads are busy and a new can be started */
516                 rc = start_one_thread();
517                 if (rc < 0 && started == 0) {
518                         ERROR("can't start initial thread: %m");
519                         goto error2;
520                 }
521         }
522
523         /* queues the job */
524         job_add(job);
525
526         /* signal an existing job */
527         pthread_cond_signal(&cond);
528         pthread_mutex_unlock(&mutex);
529         return 0;
530
531 error2:
532         job->next = free_jobs;
533         free_jobs = job;
534 error:
535         pthread_mutex_unlock(&mutex);
536         return -1;
537 }
538
539 /**
540  * Queues a new asynchronous job represented by 'callback' and 'arg'
541  * for the 'group' and the 'timeout'.
542  * Jobs are queued FIFO and are possibly executed in parallel
543  * concurrently except for job of the same group that are
544  * executed sequentially in FIFO order.
545  * @param group    The group of the job or NULL when no group.
546  * @param timeout  The maximum execution time in seconds of the job
547  *                 or 0 for unlimited time.
548  * @param callback The function to execute for achieving the job.
549  *                 Its first parameter is either 0 on normal flow
550  *                 or the signal number that broke the normal flow.
551  *                 The remaining parameter is the parameter 'arg1'
552  *                 given here.
553  * @param arg      The second argument for 'callback'
554  * @return 0 in case of success or -1 in case of error
555  */
556 int jobs_queue(
557                 const void *group,
558                 int timeout,
559                 void (*callback)(int, void*),
560                 void *arg)
561 {
562         return queue_job(group, timeout, callback, arg, 1);
563 }
564
565 /**
566  * Internal helper function for 'jobs_enter'.
567  * @see jobs_enter, jobs_leave
568  */
569 static void enter_cb(int signum, void *closure)
570 {
571         struct sync *sync = closure;
572         sync->enter(signum, sync->arg, (void*)&sync->thread);
573 }
574
575 /**
576  * Internal helper function for 'jobs_call'.
577  * @see jobs_call
578  */
579 static void call_cb(int signum, void *closure)
580 {
581         struct sync *sync = closure;
582         sync->callback(signum, sync->arg);
583         jobs_leave((void*)&sync->thread);
584 }
585
586 /**
587  * Internal helper for synchronous jobs. It enters
588  * a new thread loop for evaluating the given job
589  * as recorded by the couple 'sync_cb' and 'sync'.
590  * @see jobs_call, jobs_enter, jobs_leave
591  */
592 static int do_sync(
593                 const void *group,
594                 int timeout,
595                 void (*sync_cb)(int signum, void *closure),
596                 struct sync *sync
597 )
598 {
599         struct job *job;
600
601         pthread_mutex_lock(&mutex);
602
603         /* allocates the job */
604         job = job_create(group, timeout, sync_cb, sync);
605         if (!job) {
606                 pthread_mutex_unlock(&mutex);
607                 return -1;
608         }
609
610         /* queues the job */
611         job_add(job);
612
613         /* run until stopped */
614         if (current_thread)
615                 thread_run_internal(&sync->thread);
616         else
617                 thread_run_external(&sync->thread);
618         pthread_mutex_unlock(&mutex);
619         if (sync->thread.leaved)
620                 return 0;
621         errno = EINTR;
622         return -1;
623 }
624
625 /**
626  * Enter a synchronisation point: activates the job given by 'callback'
627  * and 'closure' using 'group' and 'timeout' to control sequencing and
628  * execution time.
629  * @param group the group for sequencing jobs
630  * @param timeout the time in seconds allocated to the job
631  * @param callback the callback that will handle the job.
632  *                 it receives 3 parameters: 'signum' that will be 0
633  *                 on normal flow or the catched signal number in case
634  *                 of interrupted flow, the context 'closure' as given and
635  *                 a 'jobloop' reference that must be used when the job is
636  *                 terminated to unlock the current execution flow.
637  * @param closure the argument to the callback
638  * @return 0 on success or -1 in case of error
639  */
640 int jobs_enter(
641                 const void *group,
642                 int timeout,
643                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
644                 void *closure
645 )
646 {
647         struct sync sync;
648
649         sync.enter = callback;
650         sync.arg = closure;
651         return do_sync(group, timeout, enter_cb, &sync);
652 }
653
654 /**
655  * Unlocks the execution flow designed by 'jobloop'.
656  * @param jobloop indication of the flow to unlock
657  * @return 0 in case of success of -1 on error
658  */
659 int jobs_leave(struct jobloop *jobloop)
660 {
661         struct thread *t;
662
663         pthread_mutex_lock(&mutex);
664         t = threads;
665         while (t && t != (struct thread*)jobloop)
666                 t = t->next;
667         if (!t) {
668                 errno = EINVAL;
669         } else {
670                 t->leaved = 1;
671                 t->stop = 1;
672                 if (t->waits)
673                         pthread_cond_broadcast(&cond);
674                 else
675                         evloop_wakeup();
676         }
677         pthread_mutex_unlock(&mutex);
678         return -!t;
679 }
680
681 /**
682  * Calls synchronously the job represented by 'callback' and 'arg1'
683  * for the 'group' and the 'timeout' and waits for its completion.
684  * @param group    The group of the job or NULL when no group.
685  * @param timeout  The maximum execution time in seconds of the job
686  *                 or 0 for unlimited time.
687  * @param callback The function to execute for achieving the job.
688  *                 Its first parameter is either 0 on normal flow
689  *                 or the signal number that broke the normal flow.
690  *                 The remaining parameter is the parameter 'arg1'
691  *                 given here.
692  * @param arg      The second argument for 'callback'
693  * @return 0 in case of success or -1 in case of error
694  */
695 int jobs_call(
696                 const void *group,
697                 int timeout,
698                 void (*callback)(int, void*),
699                 void *arg)
700 {
701         struct sync sync;
702
703         sync.callback = callback;
704         sync.arg = arg;
705
706         return do_sync(group, timeout, call_cb, &sync);
707 }
708
709 /**
710  * Ensure that the current running thread can control the event loop.
711  */
712 struct evmgr *jobs_acquire_event_manager()
713 {
714         struct thread lt;
715
716         /* ensure an existing thread environment */
717         if (!current_thread) {
718                 memset(&lt, 0, sizeof lt);
719                 current_thread = &lt;
720         }
721
722         /* lock */
723         pthread_mutex_lock(&mutex);
724
725         /* creates the evloop on need */
726         if (!evmgr)
727                 evmgr_create(&evmgr);
728
729         /* acquire the event loop under lock */
730         if (evmgr)
731                 evloop_acquire();
732
733         /* unlock */
734         pthread_mutex_unlock(&mutex);
735
736         /* release the faked thread environment if needed */
737         if (current_thread == &lt) {
738                 /*
739                  * Releasing it is needed because there is no way to guess
740                  * when it has to be released really. But here is where it is
741                  * hazardous: if the caller modifies the eventloop when it
742                  * is waiting, there is no way to make the change effective.
743                  * A workaround to achieve that goal is for the caller to
744                  * require the event loop a second time after having modified it.
745                  */
746                 NOTICE("Requiring event manager/loop from outside of binder's callback is hazardous!");
747                 if (verbose_wants(Log_Level_Info))
748                         sig_monitor_dumpstack();
749                 evloop_release();
750                 current_thread = NULL;
751         }
752         return evmgr;
753 }
754
755 /**
756  * Enter the jobs processing loop.
757  * @param allowed_count Maximum count of thread for jobs including this one
758  * @param start_count   Count of thread to start now, must be lower.
759  * @param waiter_count  Maximum count of jobs that can be waiting.
760  * @param start         The start routine to activate (can't be NULL)
761  * @return 0 in case of success or -1 in case of error.
762  */
763 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
764 {
765         int rc, launched;
766         struct job *job;
767
768         assert(allowed_count >= 1);
769         assert(start_count >= 0);
770         assert(waiter_count > 0);
771         assert(start_count <= allowed_count);
772
773         rc = -1;
774         pthread_mutex_lock(&mutex);
775
776         /* check whether already running */
777         if (current_thread || allowed) {
778                 ERROR("thread already started");
779                 errno = EINVAL;
780                 goto error;
781         }
782
783         /* records the allowed count */
784         allowed = allowed_count;
785         started = 0;
786         running = 0;
787         remains = waiter_count;
788
789         /* start at least one thread: the current one */
790         launched = 1;
791         while (launched < start_count) {
792                 if (start_one_thread() != 0) {
793                         ERROR("Not all threads can be started");
794                         goto error;
795                 }
796                 launched++;
797         }
798
799         /* queue the start job */
800         job = job_create(NULL, 0, start, arg);
801         if (!job)
802                 goto error;
803         job_add(job);
804
805         /* run until end */
806         thread_main();
807         rc = 0;
808 error:
809         pthread_mutex_unlock(&mutex);
810         if (exit_handler)
811                 exit_handler();
812         return rc;
813 }
814
815 /**
816  * Exit jobs threads and call handler if not NULL.
817  */
818 void jobs_exit(void (*handler)())
819 {
820         struct thread *t;
821
822         /* request all threads to stop */
823         pthread_mutex_lock(&mutex);
824
825         /* set the handler */
826         exit_handler = handler;
827
828         /* stops the threads */
829         t = threads;
830         while (t) {
831                 t->stop = 1;
832                 t = t->next;
833         }
834
835         /* wait the threads */
836         pthread_cond_broadcast(&cond);
837
838         /* leave */
839         pthread_mutex_unlock(&mutex);
840 }