2 * Copyright (C) 2016-2019 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #if defined(NO_JOBS_WATCHDOG)
21 # define HAS_WATCHDOG 0
23 # define HAS_WATCHDOG 1
32 #include <sys/syscall.h>
36 #include <sys/eventfd.h>
38 #include <systemd/sd-event.h>
40 #include <systemd/sd-daemon.h>
44 #include "sig-monitor.h"
47 #define EVENT_TIMEOUT_TOP ((uint64_t)-1)
48 #define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
52 /** Internal shortcut for callback */
53 typedef void (*job_cb_t)(int, void*);
55 /** Description of a pending job */
58 struct job *next; /**< link to the next job enqueued */
59 const void *group; /**< group of the request */
60 job_cb_t callback; /**< processing callback */
61 void *arg; /**< argument */
62 int timeout; /**< timeout in second for processing the request */
63 unsigned blocked: 1; /**< is an other request blocking this one ? */
64 unsigned dropped: 1; /**< is removed ? */
67 /** Description of handled event loops */
70 unsigned state; /**< encoded state */
71 int efd; /**< event notification */
72 struct sd_event *sdev; /**< the systemd event loop */
73 struct thread *holder; /**< holder of the evloop */
76 #define EVLOOP_STATE_WAIT 1U
77 #define EVLOOP_STATE_RUN 2U
79 /** Description of threads */
82 struct thread *next; /**< next thread of the list */
83 struct thread *upper; /**< upper same thread */
84 struct thread *nholder;/**< next holder for evloop */
85 pthread_cond_t *cwhold;/**< condition wait for holding */
86 struct job *job; /**< currently processed job */
87 pthread_t tid; /**< the thread id */
88 volatile unsigned stop: 1; /**< stop requested */
89 volatile unsigned waits: 1; /**< is waiting? */
93 * Description of synchronous callback
97 struct thread thread; /**< thread loop data */
99 void (*callback)(int, void*); /**< the synchronous callback */
100 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
101 /**< the entering synchronous routine */
103 void *arg; /**< the argument of the callback */
107 /* synchronisation of threads */
108 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
109 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
111 /* count allowed, started and running threads */
112 static int allowed = 0; /** allowed count of threads */
113 static int started = 0; /** started count of threads */
114 static int running = 0; /** running count of threads */
115 static int remains = 0; /** allowed count of waiting jobs */
117 /* list of threads */
118 static struct thread *threads;
119 static _Thread_local struct thread *current_thread;
121 /* queue of pending jobs */
122 static struct job *first_job;
123 static struct job *free_jobs;
126 static struct evloop evloop;
129 * Create a new job with the given parameters
130 * @param group the group of the job
131 * @param timeout the timeout of the job (0 if none)
132 * @param callback the function that achieves the job
133 * @param arg the argument of the callback
134 * @return the created job unblock or NULL when no more memory
136 static struct job *job_create(
144 /* try recyle existing job */
147 free_jobs = job->next;
149 /* allocation without blocking */
150 pthread_mutex_unlock(&mutex);
151 job = malloc(sizeof *job);
152 pthread_mutex_lock(&mutex);
154 ERROR("out of memory");
159 /* initialises the job */
161 job->timeout = timeout;
162 job->callback = callback;
171 * Adds 'job' at the end of the list of jobs, marking it
172 * as blocked if an other job with the same group is pending.
173 * @param job the job to add
175 static void job_add(struct job *job)
178 struct job *ijob, **pjob;
184 /* search end and blockers */
188 if (group && ijob->group == group)
200 * Get the next job to process or NULL if none.
201 * @return the first job that isn't blocked or NULL
203 static inline struct job *job_get()
205 struct job *job = first_job;
206 while (job && job->blocked)
214 * Releases the processed 'job': removes it
215 * from the list of jobs and unblock the first
216 * pending job of the same group if any.
217 * @param job the job to release
219 static inline void job_release(struct job *job)
221 struct job *ijob, **pjob;
224 /* first unqueue the job */
227 while (ijob != job) {
233 /* then unblock jobs of the same group */
237 while (ijob && ijob->group != group)
243 /* recycle the job */
244 job->next = free_jobs;
249 * Monitored cancel callback for a job.
250 * This function is called by the monitor
251 * to cancel the job when the safe environment
253 * @param signum 0 on normal flow or the number
254 * of the signal that interrupted the normal
256 * @param arg the job to run
258 static void job_cancel(int signum, void *arg)
260 struct job *job = arg;
261 job->callback(SIGABRT, job->arg);
265 * Monitored normal callback for events.
266 * This function is called by the monitor
267 * to run the event loop when the safe environment
269 * @param signum 0 on normal flow or the number
270 * of the signal that interrupted the normal
272 * @param arg the events to run
274 static void evloop_run(int signum, void *arg)
281 rc = sd_event_prepare(se);
284 CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
288 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
291 ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
294 evloop.state = EVLOOP_STATE_RUN;
296 rc = sd_event_dispatch(se);
299 ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
307 * Internal callback for evloop management.
308 * The effect of this function is hidden: it exits
309 * the waiting poll if any.
311 static void evloop_on_efd_event()
314 read(evloop.efd, &x, sizeof x);
318 * wakeup the event loop if needed by sending
321 static void evloop_wakeup()
325 if (evloop.state & EVLOOP_STATE_WAIT) {
327 write(evloop.efd, &x, sizeof x);
332 * Release the currently held event loop
334 static void evloop_release()
336 struct thread *nh, *ct = current_thread;
338 if (ct && evloop.holder == ct) {
342 pthread_cond_signal(nh->cwhold);
347 * get the eventloop for the current thread
349 static int evloop_get()
351 struct thread *ct = current_thread;
354 return evloop.holder == ct;
365 * acquire the eventloop for the current thread
367 static void evloop_acquire()
369 struct thread **pwait, *ct;
372 /* try to get the evloop */
374 /* failed, init waiting state */
378 pthread_cond_init(&cond, NULL);
380 /* queue current thread in holder list */
381 pwait = &evloop.holder;
383 pwait = &(*pwait)->nholder;
386 /* wake up the evloop */
389 /* wait to acquire the evloop */
390 pthread_cond_wait(&cond, &mutex);
391 pthread_cond_destroy(&cond);
397 * @param me the description of the thread to enter
399 static void thread_enter(volatile struct thread *me)
402 /* initialize description of itself and link it in the list */
403 me->tid = pthread_self();
406 me->upper = current_thread;
408 threads = (struct thread*)me;
409 current_thread = (struct thread*)me;
414 * @param me the description of the thread to leave
416 static void thread_leave()
418 struct thread **prv, *me;
420 /* unlink the current thread and cleanup */
427 current_thread = me->upper;
431 * Main processing loop of internal threads with processing jobs.
432 * The loop must be called with the mutex locked
433 * and it returns with the mutex locked.
434 * @param me the description of the thread to use
435 * TODO: how are timeout handled when reentering?
437 static void thread_run_internal(volatile struct thread *me)
444 /* loop until stopped */
446 /* release the current event loop */
452 /* prepare running the job */
453 job->blocked = 1; /* mark job as blocked */
454 me->job = job; /* record the job (only for terminate) */
457 pthread_mutex_unlock(&mutex);
458 sig_monitor(job->timeout, job->callback, job->arg);
459 pthread_mutex_lock(&mutex);
461 /* release the run job */
463 /* no job, check event loop wait */
464 } else if (evloop_get()) {
465 if (evloop.state != 0) {
467 CRITICAL("Can't enter dispatch while in dispatch!");
471 evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
472 pthread_mutex_unlock(&mutex);
473 sig_monitor(0, evloop_run, NULL);
474 pthread_mutex_lock(&mutex);
477 /* no job and no event loop */
480 ERROR("Entering job deep sleep! Check your bindings.");
482 pthread_cond_wait(&cond, &mutex);
493 * Main processing loop of external threads.
494 * The loop must be called with the mutex locked
495 * and it returns with the mutex locked.
496 * @param me the description of the thread to use
498 static void thread_run_external(volatile struct thread *me)
503 /* loop until stopped */
506 pthread_cond_wait(&cond, &mutex);
512 * Root for created threads.
514 static void thread_main()
520 sig_monitor_init_timeouts();
521 thread_run_internal(&me);
522 sig_monitor_clean_timeouts();
528 * Entry point for created threads.
529 * @param data not used
532 static void *thread_starter(void *data)
534 pthread_mutex_lock(&mutex);
536 pthread_mutex_unlock(&mutex);
541 * Starts a new thread
542 * @return 0 in case of success or -1 in case of error
544 static int start_one_thread()
549 rc = pthread_create(&tid, NULL, thread_starter, NULL);
552 WARNING("not able to start thread: %m");
559 * Queues a new asynchronous job represented by 'callback' and 'arg'
560 * for the 'group' and the 'timeout'.
561 * Jobs are queued FIFO and are possibly executed in parallel
562 * concurrently except for job of the same group that are
563 * executed sequentially in FIFO order.
564 * @param group The group of the job or NULL when no group.
565 * @param timeout The maximum execution time in seconds of the job
566 * or 0 for unlimited time.
567 * @param callback The function to execute for achieving the job.
568 * Its first parameter is either 0 on normal flow
569 * or the signal number that broke the normal flow.
570 * The remaining parameter is the parameter 'arg1'
572 * @param arg The second argument for 'callback'
573 * @return 0 in case of success or -1 in case of error
578 void (*callback)(int, void*),
584 pthread_mutex_lock(&mutex);
586 /* allocates the job */
587 job = job_create(group, timeout, callback, arg);
591 /* check availability */
593 ERROR("can't process job with threads: too many jobs");
598 /* start a thread if needed */
599 if (running == started && started < allowed) {
600 /* all threads are busy and a new can be started */
601 rc = start_one_thread();
602 if (rc < 0 && started == 0) {
603 ERROR("can't start initial thread: %m");
611 /* signal an existing job */
612 pthread_cond_signal(&cond);
613 pthread_mutex_unlock(&mutex);
617 job->next = free_jobs;
620 pthread_mutex_unlock(&mutex);
625 * Internal helper function for 'jobs_enter'.
626 * @see jobs_enter, jobs_leave
628 static void enter_cb(int signum, void *closure)
630 struct sync *sync = closure;
631 sync->enter(signum, sync->arg, (void*)&sync->thread);
635 * Internal helper function for 'jobs_call'.
638 static void call_cb(int signum, void *closure)
640 struct sync *sync = closure;
641 sync->callback(signum, sync->arg);
642 jobs_leave((void*)&sync->thread);
646 * Internal helper for synchronous jobs. It enters
647 * a new thread loop for evaluating the given job
648 * as recorded by the couple 'sync_cb' and 'sync'.
649 * @see jobs_call, jobs_enter, jobs_leave
654 void (*sync_cb)(int signum, void *closure),
660 pthread_mutex_lock(&mutex);
662 /* allocates the job */
663 job = job_create(group, timeout, sync_cb, sync);
665 pthread_mutex_unlock(&mutex);
672 /* run until stopped */
674 thread_run_internal(&sync->thread);
676 thread_run_external(&sync->thread);
677 pthread_mutex_unlock(&mutex);
682 * Enter a synchronisation point: activates the job given by 'callback'
683 * and 'closure' using 'group' and 'timeout' to control sequencing and
685 * @param group the group for sequencing jobs
686 * @param timeout the time in seconds allocated to the job
687 * @param callback the callback that will handle the job.
688 * it receives 3 parameters: 'signum' that will be 0
689 * on normal flow or the catched signal number in case
690 * of interrupted flow, the context 'closure' as given and
691 * a 'jobloop' reference that must be used when the job is
692 * terminated to unlock the current execution flow.
693 * @param closure the argument to the callback
694 * @return 0 on success or -1 in case of error
699 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
705 sync.enter = callback;
707 return do_sync(group, timeout, enter_cb, &sync);
711 * Unlocks the execution flow designed by 'jobloop'.
712 * @param jobloop indication of the flow to unlock
713 * @return 0 in case of success of -1 on error
715 int jobs_leave(struct jobloop *jobloop)
719 pthread_mutex_lock(&mutex);
721 while (t && t != (struct thread*)jobloop)
728 pthread_cond_broadcast(&cond);
732 pthread_mutex_unlock(&mutex);
737 * Calls synchronously the job represented by 'callback' and 'arg1'
738 * for the 'group' and the 'timeout' and waits for its completion.
739 * @param group The group of the job or NULL when no group.
740 * @param timeout The maximum execution time in seconds of the job
741 * or 0 for unlimited time.
742 * @param callback The function to execute for achieving the job.
743 * Its first parameter is either 0 on normal flow
744 * or the signal number that broke the normal flow.
745 * The remaining parameter is the parameter 'arg1'
747 * @param arg The second argument for 'callback'
748 * @return 0 in case of success or -1 in case of error
753 void (*callback)(int, void*),
758 sync.callback = callback;
761 return do_sync(group, timeout, call_cb, &sync);
765 * Internal callback for evloop management.
766 * The effect of this function is hidden: it exits
767 * the waiting poll if any. Then it wakes up a thread
768 * awaiting the evloop using signal.
770 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
772 evloop_on_efd_event();
777 * Gets a sd_event item for the current thread.
778 * @return a sd_event or NULL in case of error
780 static struct sd_event *get_sd_event_locked()
784 /* creates the evloop on need */
786 /* start the creation */
788 /* creates the eventfd for waking up polls */
789 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
790 if (evloop.efd < 0) {
791 ERROR("can't make eventfd for events");
794 /* create the systemd event loop */
795 rc = sd_event_new(&evloop.sdev);
797 ERROR("can't make new event loop");
800 /* put the eventfd in the event loop */
801 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
803 ERROR("can't register eventfd");
804 sd_event_unref(evloop.sdev);
813 /* acquire the event loop */
820 * Gets a sd_event item for the current thread.
821 * @return a sd_event or NULL in case of error
823 struct sd_event *jobs_get_sd_event()
825 struct sd_event *result;
828 /* ensure an existing thread environment */
829 if (!current_thread) {
830 memset(<, 0, sizeof lt);
831 current_thread = <
835 pthread_mutex_lock(&mutex);
836 result = get_sd_event_locked();
837 pthread_mutex_unlock(&mutex);
839 /* release the faked thread environment if needed */
840 if (current_thread == <) {
842 * Releasing it is needed because there is no way to guess
843 * when it has to be released really. But here is where it is
844 * hazardous: if the caller modifies the eventloop when it
845 * is waiting, there is no way to make the change effective.
846 * A workaround to achieve that goal is for the caller to
847 * require the event loop a second time after having modified it.
849 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
850 if (verbose_wants(Log_Level_Info))
851 sig_monitor_dumpstack();
853 current_thread = NULL;
860 * Enter the jobs processing loop.
861 * @param allowed_count Maximum count of thread for jobs including this one
862 * @param start_count Count of thread to start now, must be lower.
863 * @param waiter_count Maximum count of jobs that can be waiting.
864 * @param start The start routine to activate (can't be NULL)
865 * @return 0 in case of success or -1 in case of error.
867 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
872 assert(allowed_count >= 1);
873 assert(start_count >= 0);
874 assert(waiter_count > 0);
875 assert(start_count <= allowed_count);
878 pthread_mutex_lock(&mutex);
880 /* check whether already running */
881 if (current_thread || allowed) {
882 ERROR("thread already started");
887 /* records the allowed count */
888 allowed = allowed_count;
891 remains = waiter_count;
894 /* set the watchdog */
895 if (sd_watchdog_enabled(0, NULL))
896 sd_event_set_watchdog(get_sd_event_locked(), 1);
899 /* start at least one thread: the current one */
901 while (launched < start_count) {
902 if (start_one_thread() != 0) {
903 ERROR("Not all threads can be started");
909 /* queue the start job */
910 job = job_create(NULL, 0, start, arg);
919 pthread_mutex_unlock(&mutex);
924 * Terminate all the threads and cancel all pending jobs.
926 void jobs_terminate()
928 struct job *job, *head, *tail;
929 pthread_t me, *others;
936 /* request all threads to stop */
937 pthread_mutex_lock(&mutex);
940 /* count the number of threads */
944 if (!t->upper && !pthread_equal(t->tid, me))
949 /* fill the array of threads */
950 others = alloca(count * sizeof *others);
954 if (!t->upper && !pthread_equal(t->tid, me))
955 others[count++] = t->tid;
959 /* stops the threads */
966 /* wait the threads */
967 pthread_cond_broadcast(&cond);
968 pthread_mutex_unlock(&mutex);
970 pthread_join(others[--count], NULL);
971 pthread_mutex_lock(&mutex);
973 /* cancel pending jobs of other threads */
983 /* search if job is stacked for current */
985 while (t && t->job != job)
988 /* yes, relink it at end */
996 /* no cancel the job */
997 pthread_mutex_unlock(&mutex);
998 sig_monitor(0, job_cancel, job);
1000 pthread_mutex_lock(&mutex);
1003 pthread_mutex_unlock(&mutex);