2 * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #if defined(NO_JOBS_WATCHDOG)
21 # define HAS_WATCHDOG 0
23 # define HAS_WATCHDOG 1
32 #include <sys/syscall.h>
36 #include <sys/eventfd.h>
38 #include <systemd/sd-event.h>
41 #include <systemd/sd-daemon.h>
45 #include "fdev-epoll.h"
46 #include "sig-monitor.h"
50 #define _alert_ "do you really want to remove signal monitoring?"
51 #define sig_monitor_init_timeouts() ((void)0)
52 #define sig_monitor_clean_timeouts() ((void)0)
53 #define sig_monitor(to,cb,arg) (cb(0,arg))
56 #define EVENT_TIMEOUT_TOP ((uint64_t)-1)
57 #define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
59 /** Internal shortcut for callback */
60 typedef void (*job_cb_t)(int, void*);
62 /** Description of a pending job */
65 struct job *next; /**< link to the next job enqueued */
66 const void *group; /**< group of the request */
67 job_cb_t callback; /**< processing callback */
68 void *arg; /**< argument */
69 int timeout; /**< timeout in second for processing the request */
70 unsigned blocked: 1; /**< is an other request blocking this one ? */
71 unsigned dropped: 1; /**< is removed ? */
74 /** Description of handled event loops */
77 unsigned state; /**< encoded state */
78 int efd; /**< event notification */
79 struct sd_event *sdev; /**< the systemd event loop */
80 pthread_cond_t cond; /**< condition */
81 struct fdev *fdev; /**< handling of events */
84 #define EVLOOP_STATE_WAIT 1U
85 #define EVLOOP_STATE_RUN 2U
86 #define EVLOOP_STATE_LOCK 4U
88 /** Description of threads */
91 struct thread *next; /**< next thread of the list */
92 struct thread *upper; /**< upper same thread */
93 struct job *job; /**< currently processed job */
94 pthread_t tid; /**< the thread id */
95 volatile unsigned stop: 1; /**< stop requested */
96 volatile unsigned waits: 1; /**< is waiting? */
100 * Description of synchonous callback
104 struct thread thread; /**< thread loop data */
106 void (*callback)(int, void*); /**< the synchronous callback */
107 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
108 /**< the entering synchronous routine */
110 void *arg; /**< the argument of the callback */
114 /* synchronisation of threads */
115 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
116 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
118 /* count allowed, started and running threads */
119 static int allowed = 0; /** allowed count of threads */
120 static int started = 0; /** started count of threads */
121 static int running = 0; /** running count of threads */
122 static int remains = 0; /** allowed count of waiting jobs */
124 /* list of threads */
125 static struct thread *threads;
126 static _Thread_local struct thread *current_thread;
127 static _Thread_local struct evloop *current_evloop;
129 /* queue of pending jobs */
130 static struct job *first_job;
131 static struct job *free_jobs;
134 static struct evloop evloop[1];
135 static struct fdev_epoll *fdevepoll;
139 * Create a new job with the given parameters
140 * @param group the group of the job
141 * @param timeout the timeout of the job (0 if none)
142 * @param callback the function that achieves the job
143 * @param arg the argument of the callback
144 * @return the created job unblock or NULL when no more memory
146 static struct job *job_create(
154 /* try recyle existing job */
157 free_jobs = job->next;
159 /* allocation without blocking */
160 pthread_mutex_unlock(&mutex);
161 job = malloc(sizeof *job);
162 pthread_mutex_lock(&mutex);
168 /* initialises the job */
170 job->timeout = timeout;
171 job->callback = callback;
180 * Adds 'job' at the end of the list of jobs, marking it
181 * as blocked if an other job with the same group is pending.
182 * @param job the job to add
184 static void job_add(struct job *job)
187 struct job *ijob, **pjob;
193 /* search end and blockers */
197 if (group && ijob->group == group)
208 * Get the next job to process or NULL if none.
209 * @return the first job that isn't blocked or NULL
211 static inline struct job *job_get()
213 struct job *job = first_job;
214 while (job && job->blocked)
220 * Releases the processed 'job': removes it
221 * from the list of jobs and unblock the first
222 * pending job of the same group if any.
223 * @param job the job to release
225 static inline void job_release(struct job *job)
227 struct job *ijob, **pjob;
230 /* first unqueue the job */
233 while (ijob != job) {
239 /* then unblock jobs of the same group */
243 while (ijob && ijob->group != group)
249 /* recycle the job */
250 job->next = free_jobs;
255 * Monitored cancel callback for a job.
256 * This function is called by the monitor
257 * to cancel the job when the safe environment
259 * @param signum 0 on normal flow or the number
260 * of the signal that interrupted the normal
262 * @param arg the job to run
264 static void job_cancel(int signum, void *arg)
266 struct job *job = arg;
267 job->callback(SIGABRT, job->arg);
271 * Gets a fdev_epoll item.
272 * @return a fdev_epoll or NULL in case of error
274 static struct fdev_epoll *get_fdevepoll()
276 struct fdev_epoll *result;
280 result = fdevepoll = fdev_epoll_create();
286 * Monitored normal callback for events.
287 * This function is called by the monitor
288 * to run the event loop when the safe environment
290 * @param signum 0 on normal flow or the number
291 * of the signal that interrupted the normal
293 * @param arg the events to run
295 static void evloop_run(int signum, void *arg)
299 struct evloop *el = arg;
303 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
305 rc = sd_event_prepare(se);
308 ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
311 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
314 ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
317 __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
320 rc = sd_event_dispatch(se);
323 ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
328 __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
333 * Monitored normal loop for waiting events.
334 * @param signum 0 on normal flow or the number
335 * of the signal that interrupted the normal
337 * @param arg the events to run
339 static void monitored_wait_and_dispatch(int signum, void *arg)
341 struct fdev_epoll *fdev_epoll = arg;
343 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
348 * Main processing loop of threads processing jobs.
349 * The loop must be called with the mutex locked
350 * and it returns with the mutex locked.
351 * @param me the description of the thread to use
352 * TODO: how are timeout handled when reentering?
354 static void thread_run(volatile struct thread *me)
359 /* initialize description of itself and link it in the list */
360 me->tid = pthread_self();
363 me->upper = current_thread;
364 if (!current_thread) {
366 sig_monitor_init_timeouts();
369 threads = (struct thread*)me;
370 current_thread = (struct thread*)me;
372 /* loop until stopped */
374 /* release the event loop */
375 if (current_evloop) {
376 __atomic_and_fetch(¤t_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
377 current_evloop = NULL;
383 /* prepare running the job */
384 remains++; /* increases count of job that can wait */
385 job->blocked = 1; /* mark job as blocked */
386 me->job = job; /* record the job (only for terminate) */
389 pthread_mutex_unlock(&mutex);
390 sig_monitor(job->timeout, job->callback, job->arg);
391 pthread_mutex_lock(&mutex);
393 /* release the run job */
395 } else if (waitevt) {
396 /* no job and not events */
399 ERROR("Entering job deep sleep! Check your bindings.");
401 pthread_cond_wait(&cond, &mutex);
405 /* wait for events */
407 pthread_mutex_unlock(&mutex);
408 sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
409 pthread_mutex_lock(&mutex);
414 /* release the event loop */
415 if (current_evloop) {
416 __atomic_and_fetch(¤t_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
417 current_evloop = NULL;
420 /* unlink the current thread and cleanup */
425 current_thread = me->upper;
426 if (!current_thread) {
427 sig_monitor_clean_timeouts();
433 * Entry point for created threads.
434 * @param data not used
437 static void *thread_main(void *data)
441 pthread_mutex_lock(&mutex);
445 pthread_mutex_unlock(&mutex);
450 * Starts a new thread
451 * @return 0 in case of success or -1 in case of error
453 static int start_one_thread()
458 rc = pthread_create(&tid, NULL, thread_main, NULL);
461 WARNING("not able to start thread: %m");
468 * Queues a new asynchronous job represented by 'callback' and 'arg'
469 * for the 'group' and the 'timeout'.
470 * Jobs are queued FIFO and are possibly executed in parallel
471 * concurrently except for job of the same group that are
472 * executed sequentially in FIFO order.
473 * @param group The group of the job or NULL when no group.
474 * @param timeout The maximum execution time in seconds of the job
475 * or 0 for unlimited time.
476 * @param callback The function to execute for achieving the job.
477 * Its first parameter is either 0 on normal flow
478 * or the signal number that broke the normal flow.
479 * The remaining parameter is the parameter 'arg1'
481 * @param arg The second argument for 'callback'
482 * @return 0 in case of success or -1 in case of error
487 void (*callback)(int, void*),
494 pthread_mutex_lock(&mutex);
496 /* allocates the job */
497 job = job_create(group, timeout, callback, arg);
500 info = "out of memory";
504 /* check availability */
507 info = "too many jobs";
511 /* start a thread if needed */
512 if (running == started && started < allowed) {
513 /* all threads are busy and a new can be started */
514 rc = start_one_thread();
515 if (rc < 0 && started == 0) {
516 info = "can't start first thread";
525 /* signal an existing job */
526 pthread_cond_signal(&cond);
527 pthread_mutex_unlock(&mutex);
531 job->next = free_jobs;
534 ERROR("can't process job with threads: %s, %m", info);
535 pthread_mutex_unlock(&mutex);
540 * Internal helper function for 'jobs_enter'.
541 * @see jobs_enter, jobs_leave
543 static void enter_cb(int signum, void *closure)
545 struct sync *sync = closure;
546 sync->enter(signum, sync->arg, (void*)&sync->thread);
550 * Internal helper function for 'jobs_call'.
553 static void call_cb(int signum, void *closure)
555 struct sync *sync = closure;
556 sync->callback(signum, sync->arg);
557 jobs_leave((void*)&sync->thread);
561 * Internal helper for synchronous jobs. It enters
562 * a new thread loop for evaluating the given job
563 * as recorded by the couple 'sync_cb' and 'sync'.
564 * @see jobs_call, jobs_enter, jobs_leave
569 void (*sync_cb)(int signum, void *closure),
575 pthread_mutex_lock(&mutex);
577 /* allocates the job */
578 job = job_create(group, timeout, sync_cb, sync);
580 ERROR("out of memory");
582 pthread_mutex_unlock(&mutex);
589 /* run until stopped */
590 thread_run(&sync->thread);
591 pthread_mutex_unlock(&mutex);
596 * Enter a synchronisation point: activates the job given by 'callback'
597 * and 'closure' using 'group' and 'timeout' to control sequencing and
599 * @param group the group for sequencing jobs
600 * @param timeout the time in seconds allocated to the job
601 * @param callback the callback that will handle the job.
602 * it receives 3 parameters: 'signum' that will be 0
603 * on normal flow or the catched signal number in case
604 * of interrupted flow, the context 'closure' as given and
605 * a 'jobloop' reference that must be used when the job is
606 * terminated to unlock the current execution flow.
607 * @param closure the argument to the callback
608 * @return 0 on success or -1 in case of error
613 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
619 sync.enter = callback;
621 return do_sync(group, timeout, enter_cb, &sync);
625 * Unlocks the execution flow designed by 'jobloop'.
626 * @param jobloop indication of the flow to unlock
627 * @return 0 in case of success of -1 on error
629 int jobs_leave(struct jobloop *jobloop)
633 pthread_mutex_lock(&mutex);
635 while (t && t != (struct thread*)jobloop)
642 pthread_cond_broadcast(&cond);
644 pthread_mutex_unlock(&mutex);
649 * Calls synchronously the job represented by 'callback' and 'arg1'
650 * for the 'group' and the 'timeout' and waits for its completion.
651 * @param group The group of the job or NULL when no group.
652 * @param timeout The maximum execution time in seconds of the job
653 * or 0 for unlimited time.
654 * @param callback The function to execute for achieving the job.
655 * Its first parameter is either 0 on normal flow
656 * or the signal number that broke the normal flow.
657 * The remaining parameter is the parameter 'arg1'
659 * @param arg The second argument for 'callback'
660 * @return 0 in case of success or -1 in case of error
665 void (*callback)(int, void*),
670 sync.callback = callback;
673 return do_sync(group, timeout, call_cb, &sync);
677 * Internal callback for evloop management.
678 * The effect of this function is hidden: it exits
679 * the waiting poll if any. Then it wakes up a thread
680 * awaiting the evloop using signal.
682 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
685 struct evloop *evloop = userdata;
686 read(evloop->efd, &x, sizeof x);
687 pthread_mutex_lock(&mutex);
688 pthread_cond_broadcast(&evloop->cond);
689 pthread_mutex_unlock(&mutex);
694 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
696 sig_monitor(0, evloop_run, arg);
700 * Gets a sd_event item for the current thread.
701 * @return a sd_event or NULL in case of error
703 static struct sd_event *get_sd_event_locked()
709 /* creates the evloop on need */
712 /* start the creation */
714 /* creates the eventfd for waking up polls */
715 el->efd = eventfd(0, EFD_CLOEXEC);
717 ERROR("can't make eventfd for events");
720 /* create the systemd event loop */
721 rc = sd_event_new(&el->sdev);
723 ERROR("can't make new event loop");
726 /* put the eventfd in the event loop */
727 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
729 ERROR("can't register eventfd");
732 /* handle the event loop */
733 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
735 ERROR("can't create fdev");
737 sd_event_unref(el->sdev);
741 memset(el, 0, sizeof *el);
744 fdev_set_autoclose(el->fdev, 0);
745 fdev_set_events(el->fdev, EPOLLIN);
746 fdev_set_callback(el->fdev, evloop_callback, el);
749 /* attach the event loop to the current thread */
750 if (current_evloop != el) {
752 __atomic_and_fetch(¤t_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
754 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
757 /* wait for a modifiable event loop */
758 while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
760 write(el->efd, &x, sizeof x);
761 pthread_cond_wait(&el->cond, &mutex);
768 * Gets a sd_event item for the current thread.
769 * @return a sd_event or NULL in case of error
771 struct sd_event *jobs_get_sd_event()
773 struct sd_event *result;
775 pthread_mutex_lock(&mutex);
776 result = get_sd_event_locked();
777 pthread_mutex_unlock(&mutex);
783 * Gets the fdev_epoll item.
784 * @return a fdev_epoll or NULL in case of error
786 struct fdev_epoll *jobs_get_fdev_epoll()
788 struct fdev_epoll *result;
790 pthread_mutex_lock(&mutex);
791 result = get_fdevepoll();
792 pthread_mutex_unlock(&mutex);
798 * Enter the jobs processing loop.
799 * @param allowed_count Maximum count of thread for jobs including this one
800 * @param start_count Count of thread to start now, must be lower.
801 * @param waiter_count Maximum count of jobs that can be waiting.
802 * @param start The start routine to activate (can't be NULL)
803 * @return 0 in case of success or -1 in case of error.
805 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
811 assert(allowed_count >= 1);
812 assert(start_count >= 0);
813 assert(waiter_count > 0);
814 assert(start_count <= allowed_count);
817 pthread_mutex_lock(&mutex);
819 /* check whether already running */
820 if (current_thread || allowed) {
821 ERROR("thread already started");
827 if (sig_monitor_init() < 0) {
828 ERROR("failed to initialise signal handlers");
832 /* records the allowed count */
833 allowed = allowed_count;
836 remains = waiter_count;
839 /* set the watchdog */
840 if (sd_watchdog_enabled(0, NULL))
841 sd_event_set_watchdog(get_sd_event_locked(), 1);
844 /* start at least one thread */
846 while ((launched + 1) < start_count) {
847 if (start_one_thread() != 0) {
848 ERROR("Not all threads can be started");
854 /* queue the start job */
855 job = job_create(NULL, 0, start, arg);
857 ERROR("out of memory");
868 pthread_mutex_unlock(&mutex);
873 * Terminate all the threads and cancel all pending jobs.
875 void jobs_terminate()
877 struct job *job, *head, *tail;
878 pthread_t me, *others;
885 /* request all threads to stop */
886 pthread_mutex_lock(&mutex);
889 /* count the number of threads */
893 if (!t->upper && !pthread_equal(t->tid, me))
898 /* fill the array of threads */
899 others = alloca(count * sizeof *others);
903 if (!t->upper && !pthread_equal(t->tid, me))
904 others[count++] = t->tid;
908 /* stops the threads */
915 /* wait the threads */
916 pthread_cond_broadcast(&cond);
917 pthread_mutex_unlock(&mutex);
919 pthread_join(others[--count], NULL);
920 pthread_mutex_lock(&mutex);
922 /* cancel pending jobs of other threads */
932 /* search if job is stacked for current */
934 while (t && t->job != job)
937 /* yes, relink it at end */
945 /* no cancel the job */
946 pthread_mutex_unlock(&mutex);
947 sig_monitor(0, job_cancel, job);
949 pthread_mutex_lock(&mutex);
952 pthread_mutex_unlock(&mutex);