evmgr: Isolate the event loop from jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <time.h>
26 #include <sys/syscall.h>
27 #include <pthread.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <sys/eventfd.h>
31
32 #include <systemd/sd-event.h>
33
34 #include "jobs.h"
35 #include "evmgr.h"
36 #include "sig-monitor.h"
37 #include "verbose.h"
38 #include "systemd.h"
39
40 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
41 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
42
43 struct thread;
44
45 /** Internal shortcut for callback */
46 typedef void (*job_cb_t)(int, void*);
47
48 /** Description of a pending job */
49 struct job
50 {
51         struct job *next;    /**< link to the next job enqueued */
52         const void *group;   /**< group of the request */
53         job_cb_t callback;   /**< processing callback */
54         void *arg;           /**< argument */
55         int timeout;         /**< timeout in second for processing the request */
56         unsigned blocked: 1; /**< is an other request blocking this one ? */
57         unsigned dropped: 1; /**< is removed ? */
58 };
59
60 /** Description of threads */
61 struct thread
62 {
63         struct thread *next;   /**< next thread of the list */
64         struct thread *upper;  /**< upper same thread */
65         struct thread *nholder;/**< next holder for evloop */
66         pthread_cond_t *cwhold;/**< condition wait for holding */
67         struct job *job;       /**< currently processed job */
68         pthread_t tid;         /**< the thread id */
69         volatile unsigned stop: 1;      /**< stop requested */
70         volatile unsigned waits: 1;     /**< is waiting? */
71 };
72
73 /**
74  * Description of synchronous callback
75  */
76 struct sync
77 {
78         struct thread thread;   /**< thread loop data */
79         union {
80                 void (*callback)(int, void*);   /**< the synchronous callback */
81                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
82                                 /**< the entering synchronous routine */
83         };
84         void *arg;              /**< the argument of the callback */
85 };
86
87
88 /* synchronisation of threads */
89 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
91
92 /* count allowed, started and running threads */
93 static int allowed = 0; /** allowed count of threads */
94 static int started = 0; /** started count of threads */
95 static int running = 0; /** running count of threads */
96 static int remains = 0; /** allowed count of waiting jobs */
97
98 /* list of threads */
99 static struct thread *threads;
100 static _Thread_local struct thread *current_thread;
101
102 /* queue of pending jobs */
103 static struct job *first_job;
104 static struct job *free_jobs;
105
106 /* event loop */
107 static struct evmgr *evmgr;
108
109 /**
110  * Create a new job with the given parameters
111  * @param group    the group of the job
112  * @param timeout  the timeout of the job (0 if none)
113  * @param callback the function that achieves the job
114  * @param arg      the argument of the callback
115  * @return the created job unblock or NULL when no more memory
116  */
117 static struct job *job_create(
118                 const void *group,
119                 int timeout,
120                 job_cb_t callback,
121                 void *arg)
122 {
123         struct job *job;
124
125         /* try recyle existing job */
126         job = free_jobs;
127         if (job)
128                 free_jobs = job->next;
129         else {
130                 /* allocation without blocking */
131                 pthread_mutex_unlock(&mutex);
132                 job = malloc(sizeof *job);
133                 pthread_mutex_lock(&mutex);
134                 if (!job) {
135                         ERROR("out of memory");
136                         errno = ENOMEM;
137                         goto end;
138                 }
139         }
140         /* initialises the job */
141         job->group = group;
142         job->timeout = timeout;
143         job->callback = callback;
144         job->arg = arg;
145         job->blocked = 0;
146         job->dropped = 0;
147 end:
148         return job;
149 }
150
151 /**
152  * Adds 'job' at the end of the list of jobs, marking it
153  * as blocked if an other job with the same group is pending.
154  * @param job the job to add
155  */
156 static void job_add(struct job *job)
157 {
158         const void *group;
159         struct job *ijob, **pjob;
160
161         /* prepare to add */
162         group = job->group;
163         job->next = NULL;
164
165         /* search end and blockers */
166         pjob = &first_job;
167         ijob = first_job;
168         while (ijob) {
169                 if (group && ijob->group == group)
170                         job->blocked = 1;
171                 pjob = &ijob->next;
172                 ijob = ijob->next;
173         }
174
175         /* queue the jobs */
176         *pjob = job;
177         remains--;
178 }
179
180 /**
181  * Get the next job to process or NULL if none.
182  * @return the first job that isn't blocked or NULL
183  */
184 static inline struct job *job_get()
185 {
186         struct job *job = first_job;
187         while (job && job->blocked)
188                 job = job->next;
189         if (job)
190                 remains++;
191         return job;
192 }
193
194 /**
195  * Releases the processed 'job': removes it
196  * from the list of jobs and unblock the first
197  * pending job of the same group if any.
198  * @param job the job to release
199  */
200 static inline void job_release(struct job *job)
201 {
202         struct job *ijob, **pjob;
203         const void *group;
204
205         /* first unqueue the job */
206         pjob = &first_job;
207         ijob = first_job;
208         while (ijob != job) {
209                 pjob = &ijob->next;
210                 ijob = ijob->next;
211         }
212         *pjob = job->next;
213
214         /* then unblock jobs of the same group */
215         group = job->group;
216         if (group) {
217                 ijob = job->next;
218                 while (ijob && ijob->group != group)
219                         ijob = ijob->next;
220                 if (ijob)
221                         ijob->blocked = 0;
222         }
223
224         /* recycle the job */
225         job->next = free_jobs;
226         free_jobs = job;
227 }
228
229 /**
230  * Monitored cancel callback for a job.
231  * This function is called by the monitor
232  * to cancel the job when the safe environment
233  * is set.
234  * @param signum 0 on normal flow or the number
235  *               of the signal that interrupted the normal
236  *               flow, isn't used
237  * @param arg    the job to run
238  */
239 static void job_cancel(int signum, void *arg)
240 {
241         struct job *job = arg;
242         job->callback(SIGABRT, job->arg);
243 }
244
245 /**
246  * wakeup the event loop if needed by sending
247  * an event.
248  */
249 static void evloop_wakeup()
250 {
251         if (evmgr)
252                 evmgr_wakeup(evmgr);
253 }
254
255 /**
256  * Release the currently held event loop
257  */
258 static void evloop_release()
259 {
260         struct thread *nh, *ct = current_thread;
261
262         if (ct && evmgr && evmgr_release_if(evmgr, ct)) {
263                 nh = ct->nholder;
264                 ct->nholder = 0;
265                 if (nh) {
266                         evmgr_try_hold(evmgr, nh);
267                         pthread_cond_signal(nh->cwhold);
268                 }
269         }
270 }
271
272 /**
273  * get the eventloop for the current thread
274  */
275 static int evloop_get()
276 {
277         return evmgr && evmgr_try_hold(evmgr, current_thread);
278 }
279
280 /**
281  * acquire the eventloop for the current thread
282  */
283 static void evloop_acquire()
284 {
285         struct thread *pwait, *ct;
286         pthread_cond_t cond;
287
288         /* try to get the evloop */
289         if (!evloop_get()) {
290                 /* failed, init waiting state */
291                 ct = current_thread;
292                 ct->nholder = NULL;
293                 ct->cwhold = &cond;
294                 pthread_cond_init(&cond, NULL);
295
296                 /* queue current thread in holder list */
297                 pwait = evmgr_holder(evmgr);
298                 while (pwait->nholder)
299                         pwait = pwait->nholder;
300                 pwait->nholder = ct;
301
302                 /* wake up the evloop */
303                 evloop_wakeup();
304
305                 /* wait to acquire the evloop */
306                 pthread_cond_wait(&cond, &mutex);
307                 pthread_cond_destroy(&cond);
308         }
309 }
310
311 /**
312  * Enter the thread
313  * @param me the description of the thread to enter
314  */
315 static void thread_enter(volatile struct thread *me)
316 {
317         evloop_release();
318         /* initialize description of itself and link it in the list */
319         me->tid = pthread_self();
320         me->stop = 0;
321         me->waits = 0;
322         me->nholder = 0;
323         me->upper = current_thread;
324         me->next = threads;
325         threads = (struct thread*)me;
326         current_thread = (struct thread*)me;
327 }
328
329 /**
330  * leave the thread
331  * @param me the description of the thread to leave
332  */
333 static void thread_leave()
334 {
335         struct thread **prv, *me;
336
337         /* unlink the current thread and cleanup */
338         me = current_thread;
339         prv = &threads;
340         while (*prv != me)
341                 prv = &(*prv)->next;
342         *prv = me->next;
343
344         current_thread = me->upper;
345 }
346
347 /**
348  * Main processing loop of internal threads with processing jobs.
349  * The loop must be called with the mutex locked
350  * and it returns with the mutex locked.
351  * @param me the description of the thread to use
352  * TODO: how are timeout handled when reentering?
353  */
354 static void thread_run_internal(volatile struct thread *me)
355 {
356         struct job *job;
357
358         /* enter thread */
359         thread_enter(me);
360
361         /* loop until stopped */
362         while (!me->stop) {
363                 /* release the current event loop */
364                 evloop_release();
365
366                 /* get a job */
367                 job = job_get();
368                 if (job) {
369                         /* prepare running the job */
370                         job->blocked = 1; /* mark job as blocked */
371                         me->job = job; /* record the job (only for terminate) */
372
373                         /* run the job */
374                         pthread_mutex_unlock(&mutex);
375                         sig_monitor(job->timeout, job->callback, job->arg);
376                         pthread_mutex_lock(&mutex);
377
378                         /* release the run job */
379                         job_release(job);
380                 /* no job, check event loop wait */
381                 } else if (evloop_get()) {
382                         if (!evmgr_can_run(evmgr)) {
383                                 /* busy ? */
384                                 CRITICAL("Can't enter dispatch while in dispatch!");
385                                 abort();
386                         }
387                         /* run the events */
388                         pthread_mutex_unlock(&mutex);
389                         sig_monitor(0, (void(*)(int,void*))evmgr_job_run, evmgr);
390                         pthread_mutex_lock(&mutex);
391                 } else {
392                         /* no job and no event loop */
393                         running--;
394                         if (!running)
395                                 ERROR("Entering job deep sleep! Check your bindings.");
396                         me->waits = 1;
397                         pthread_cond_wait(&cond, &mutex);
398                         me->waits = 0;
399                         running++;
400                 }
401         }
402         /* cleanup */
403         evloop_release();
404         thread_leave();
405 }
406
407 /**
408  * Main processing loop of external threads.
409  * The loop must be called with the mutex locked
410  * and it returns with the mutex locked.
411  * @param me the description of the thread to use
412  */
413 static void thread_run_external(volatile struct thread *me)
414 {
415         /* enter thread */
416         thread_enter(me);
417
418         /* loop until stopped */
419         me->waits = 1;
420         while (!me->stop)
421                 pthread_cond_wait(&cond, &mutex);
422         me->waits = 0;
423         thread_leave();
424 }
425
426 /**
427  * Root for created threads.
428  */
429 static void thread_main()
430 {
431         struct thread me;
432
433         running++;
434         started++;
435         sig_monitor_init_timeouts();
436         thread_run_internal(&me);
437         sig_monitor_clean_timeouts();
438         started--;
439         running--;
440 }
441
442 /**
443  * Entry point for created threads.
444  * @param data not used
445  * @return NULL
446  */
447 static void *thread_starter(void *data)
448 {
449         pthread_mutex_lock(&mutex);
450         thread_main();
451         pthread_mutex_unlock(&mutex);
452         return NULL;
453 }
454
455 /**
456  * Starts a new thread
457  * @return 0 in case of success or -1 in case of error
458  */
459 static int start_one_thread()
460 {
461         pthread_t tid;
462         int rc;
463
464         rc = pthread_create(&tid, NULL, thread_starter, NULL);
465         if (rc != 0) {
466                 /* errno = rc; */
467                 WARNING("not able to start thread: %m");
468                 rc = -1;
469         }
470         return rc;
471 }
472
473 /**
474  * Queues a new asynchronous job represented by 'callback' and 'arg'
475  * for the 'group' and the 'timeout'.
476  * Jobs are queued FIFO and are possibly executed in parallel
477  * concurrently except for job of the same group that are
478  * executed sequentially in FIFO order.
479  * @param group    The group of the job or NULL when no group.
480  * @param timeout  The maximum execution time in seconds of the job
481  *                 or 0 for unlimited time.
482  * @param callback The function to execute for achieving the job.
483  *                 Its first parameter is either 0 on normal flow
484  *                 or the signal number that broke the normal flow.
485  *                 The remaining parameter is the parameter 'arg1'
486  *                 given here.
487  * @param arg      The second argument for 'callback'
488  * @return 0 in case of success or -1 in case of error
489  */
490 int jobs_queue(
491                 const void *group,
492                 int timeout,
493                 void (*callback)(int, void*),
494                 void *arg)
495 {
496         struct job *job;
497         int rc;
498
499         pthread_mutex_lock(&mutex);
500
501         /* allocates the job */
502         job = job_create(group, timeout, callback, arg);
503         if (!job)
504                 goto error;
505
506         /* check availability */
507         if (remains <= 0) {
508                 ERROR("can't process job with threads: too many jobs");
509                 errno = EBUSY;
510                 goto error2;
511         }
512
513         /* start a thread if needed */
514         if (running == started && started < allowed) {
515                 /* all threads are busy and a new can be started */
516                 rc = start_one_thread();
517                 if (rc < 0 && started == 0) {
518                         ERROR("can't start initial thread: %m");
519                         goto error2;
520                 }
521         }
522
523         /* queues the job */
524         job_add(job);
525
526         /* signal an existing job */
527         pthread_cond_signal(&cond);
528         pthread_mutex_unlock(&mutex);
529         return 0;
530
531 error2:
532         job->next = free_jobs;
533         free_jobs = job;
534 error:
535         pthread_mutex_unlock(&mutex);
536         return -1;
537 }
538
539 /**
540  * Internal helper function for 'jobs_enter'.
541  * @see jobs_enter, jobs_leave
542  */
543 static void enter_cb(int signum, void *closure)
544 {
545         struct sync *sync = closure;
546         sync->enter(signum, sync->arg, (void*)&sync->thread);
547 }
548
549 /**
550  * Internal helper function for 'jobs_call'.
551  * @see jobs_call
552  */
553 static void call_cb(int signum, void *closure)
554 {
555         struct sync *sync = closure;
556         sync->callback(signum, sync->arg);
557         jobs_leave((void*)&sync->thread);
558 }
559
560 /**
561  * Internal helper for synchronous jobs. It enters
562  * a new thread loop for evaluating the given job
563  * as recorded by the couple 'sync_cb' and 'sync'.
564  * @see jobs_call, jobs_enter, jobs_leave
565  */
566 static int do_sync(
567                 const void *group,
568                 int timeout,
569                 void (*sync_cb)(int signum, void *closure),
570                 struct sync *sync
571 )
572 {
573         struct job *job;
574
575         pthread_mutex_lock(&mutex);
576
577         /* allocates the job */
578         job = job_create(group, timeout, sync_cb, sync);
579         if (!job) {
580                 pthread_mutex_unlock(&mutex);
581                 return -1;
582         }
583
584         /* queues the job */
585         job_add(job);
586
587         /* run until stopped */
588         if (current_thread)
589                 thread_run_internal(&sync->thread);
590         else
591                 thread_run_external(&sync->thread);
592         pthread_mutex_unlock(&mutex);
593         return 0;
594 }
595
596 /**
597  * Enter a synchronisation point: activates the job given by 'callback'
598  * and 'closure' using 'group' and 'timeout' to control sequencing and
599  * execution time.
600  * @param group the group for sequencing jobs
601  * @param timeout the time in seconds allocated to the job
602  * @param callback the callback that will handle the job.
603  *                 it receives 3 parameters: 'signum' that will be 0
604  *                 on normal flow or the catched signal number in case
605  *                 of interrupted flow, the context 'closure' as given and
606  *                 a 'jobloop' reference that must be used when the job is
607  *                 terminated to unlock the current execution flow.
608  * @param closure the argument to the callback
609  * @return 0 on success or -1 in case of error
610  */
611 int jobs_enter(
612                 const void *group,
613                 int timeout,
614                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
615                 void *closure
616 )
617 {
618         struct sync sync;
619
620         sync.enter = callback;
621         sync.arg = closure;
622         return do_sync(group, timeout, enter_cb, &sync);
623 }
624
625 /**
626  * Unlocks the execution flow designed by 'jobloop'.
627  * @param jobloop indication of the flow to unlock
628  * @return 0 in case of success of -1 on error
629  */
630 int jobs_leave(struct jobloop *jobloop)
631 {
632         struct thread *t;
633
634         pthread_mutex_lock(&mutex);
635         t = threads;
636         while (t && t != (struct thread*)jobloop)
637                 t = t->next;
638         if (!t) {
639                 errno = EINVAL;
640         } else {
641                 t->stop = 1;
642                 if (t->waits)
643                         pthread_cond_broadcast(&cond);
644                 else
645                         evloop_wakeup();
646         }
647         pthread_mutex_unlock(&mutex);
648         return -!t;
649 }
650
651 /**
652  * Calls synchronously the job represented by 'callback' and 'arg1'
653  * for the 'group' and the 'timeout' and waits for its completion.
654  * @param group    The group of the job or NULL when no group.
655  * @param timeout  The maximum execution time in seconds of the job
656  *                 or 0 for unlimited time.
657  * @param callback The function to execute for achieving the job.
658  *                 Its first parameter is either 0 on normal flow
659  *                 or the signal number that broke the normal flow.
660  *                 The remaining parameter is the parameter 'arg1'
661  *                 given here.
662  * @param arg      The second argument for 'callback'
663  * @return 0 in case of success or -1 in case of error
664  */
665 int jobs_call(
666                 const void *group,
667                 int timeout,
668                 void (*callback)(int, void*),
669                 void *arg)
670 {
671         struct sync sync;
672
673         sync.callback = callback;
674         sync.arg = arg;
675
676         return do_sync(group, timeout, call_cb, &sync);
677 }
678
679 /**
680  * Ensure that the current running thread can control the event loop.
681  */
682 void jobs_acquire_event_manager()
683 {
684         struct thread lt;
685
686         /* ensure an existing thread environment */
687         if (!current_thread) {
688                 memset(&lt, 0, sizeof lt);
689                 current_thread = &lt;
690         }
691
692         /* lock */
693         pthread_mutex_lock(&mutex);
694
695         /* creates the evloop on need */
696         if (!evmgr)
697                 evmgr_create(&evmgr);
698
699         /* acquire the event loop under lock */
700         if (evmgr)
701                 evloop_acquire();
702
703         /* unlock */
704         pthread_mutex_unlock(&mutex);
705
706         /* release the faked thread environment if needed */
707         if (current_thread == &lt) {
708                 /*
709                  * Releasing it is needed because there is no way to guess
710                  * when it has to be released really. But here is where it is
711                  * hazardous: if the caller modifies the eventloop when it
712                  * is waiting, there is no way to make the change effective.
713                  * A workaround to achieve that goal is for the caller to
714                  * require the event loop a second time after having modified it.
715                  */
716                 NOTICE("Requiring event manager/loop from outside of binder's callback is hazardous!");
717                 if (verbose_wants(Log_Level_Info))
718                         sig_monitor_dumpstack();
719                 evloop_release();
720                 current_thread = NULL;
721         }
722 }
723
724 /**
725  * Enter the jobs processing loop.
726  * @param allowed_count Maximum count of thread for jobs including this one
727  * @param start_count   Count of thread to start now, must be lower.
728  * @param waiter_count  Maximum count of jobs that can be waiting.
729  * @param start         The start routine to activate (can't be NULL)
730  * @return 0 in case of success or -1 in case of error.
731  */
732 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
733 {
734         int rc, launched;
735         struct job *job;
736
737         assert(allowed_count >= 1);
738         assert(start_count >= 0);
739         assert(waiter_count > 0);
740         assert(start_count <= allowed_count);
741
742         rc = -1;
743         pthread_mutex_lock(&mutex);
744
745         /* check whether already running */
746         if (current_thread || allowed) {
747                 ERROR("thread already started");
748                 errno = EINVAL;
749                 goto error;
750         }
751
752         /* records the allowed count */
753         allowed = allowed_count;
754         started = 0;
755         running = 0;
756         remains = waiter_count;
757
758         /* start at least one thread: the current one */
759         launched = 1;
760         while (launched < start_count) {
761                 if (start_one_thread() != 0) {
762                         ERROR("Not all threads can be started");
763                         goto error;
764                 }
765                 launched++;
766         }
767
768         /* queue the start job */
769         job = job_create(NULL, 0, start, arg);
770         if (!job)
771                 goto error;
772         job_add(job);
773
774         /* run until end */
775         thread_main();
776         rc = 0;
777 error:
778         pthread_mutex_unlock(&mutex);
779         return rc;
780 }
781
782 /**
783  * Terminate all the threads and cancel all pending jobs.
784  */
785 void jobs_terminate()
786 {
787         struct job *job, *head, *tail;
788         pthread_t me, *others;
789         struct thread *t;
790         int count;
791
792         /* how am i? */
793         me = pthread_self();
794
795         /* request all threads to stop */
796         pthread_mutex_lock(&mutex);
797         allowed = 0;
798
799         /* count the number of threads */
800         count = 0;
801         t = threads;
802         while (t) {
803                 if (!t->upper && !pthread_equal(t->tid, me))
804                         count++;
805                 t = t->next;
806         }
807
808         /* fill the array of threads */
809         others = alloca(count * sizeof *others);
810         count = 0;
811         t = threads;
812         while (t) {
813                 if (!t->upper && !pthread_equal(t->tid, me))
814                         others[count++] = t->tid;
815                 t = t->next;
816         }
817
818         /* stops the threads */
819         t = threads;
820         while (t) {
821                 t->stop = 1;
822                 t = t->next;
823         }
824
825         /* wait the threads */
826         pthread_cond_broadcast(&cond);
827         pthread_mutex_unlock(&mutex);
828         while (count)
829                 pthread_join(others[--count], NULL);
830         pthread_mutex_lock(&mutex);
831
832         /* cancel pending jobs of other threads */
833         remains = 0;
834         head = first_job;
835         first_job = NULL;
836         tail = NULL;
837         while (head) {
838                 /* unlink the job */
839                 job = head;
840                 head = job->next;
841
842                 /* search if job is stacked for current */
843                 t = current_thread;
844                 while (t && t->job != job)
845                         t = t->upper;
846                 if (t) {
847                         /* yes, relink it at end */
848                         if (tail)
849                                 tail->next = job;
850                         else
851                                 first_job = job;
852                         tail = job;
853                         job->next = NULL;
854                 } else {
855                         /* no cancel the job */
856                         pthread_mutex_unlock(&mutex);
857                         sig_monitor(0, job_cancel, job);
858                         free(job);
859                         pthread_mutex_lock(&mutex);
860                 }
861         }
862         pthread_mutex_unlock(&mutex);
863 }
864