2 * Copyright (C) 2016-2019 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <sys/syscall.h>
30 #include <sys/eventfd.h>
32 #include <systemd/sd-event.h>
35 #include "sig-monitor.h"
39 #define EVENT_TIMEOUT_TOP ((uint64_t)-1)
40 #define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
44 /** Internal shortcut for callback */
45 typedef void (*job_cb_t)(int, void*);
47 /** Description of a pending job */
50 struct job *next; /**< link to the next job enqueued */
51 const void *group; /**< group of the request */
52 job_cb_t callback; /**< processing callback */
53 void *arg; /**< argument */
54 int timeout; /**< timeout in second for processing the request */
55 unsigned blocked: 1; /**< is an other request blocking this one ? */
56 unsigned dropped: 1; /**< is removed ? */
59 /** Description of handled event loops */
62 unsigned state; /**< encoded state */
63 int efd; /**< event notification */
64 struct sd_event *sdev; /**< the systemd event loop */
65 struct thread *holder; /**< holder of the evloop */
68 #define EVLOOP_STATE_WAIT 1U
69 #define EVLOOP_STATE_RUN 2U
71 /** Description of threads */
74 struct thread *next; /**< next thread of the list */
75 struct thread *upper; /**< upper same thread */
76 struct thread *nholder;/**< next holder for evloop */
77 pthread_cond_t *cwhold;/**< condition wait for holding */
78 struct job *job; /**< currently processed job */
79 pthread_t tid; /**< the thread id */
80 volatile unsigned stop: 1; /**< stop requested */
81 volatile unsigned waits: 1; /**< is waiting? */
85 * Description of synchronous callback
89 struct thread thread; /**< thread loop data */
91 void (*callback)(int, void*); /**< the synchronous callback */
92 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
93 /**< the entering synchronous routine */
95 void *arg; /**< the argument of the callback */
99 /* synchronisation of threads */
100 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
103 /* count allowed, started and running threads */
104 static int allowed = 0; /** allowed count of threads */
105 static int started = 0; /** started count of threads */
106 static int running = 0; /** running count of threads */
107 static int remains = 0; /** allowed count of waiting jobs */
109 /* list of threads */
110 static struct thread *threads;
111 static _Thread_local struct thread *current_thread;
113 /* queue of pending jobs */
114 static struct job *first_job;
115 static struct job *free_jobs;
118 static struct evloop evloop;
121 * Create a new job with the given parameters
122 * @param group the group of the job
123 * @param timeout the timeout of the job (0 if none)
124 * @param callback the function that achieves the job
125 * @param arg the argument of the callback
126 * @return the created job unblock or NULL when no more memory
128 static struct job *job_create(
136 /* try recyle existing job */
139 free_jobs = job->next;
141 /* allocation without blocking */
142 pthread_mutex_unlock(&mutex);
143 job = malloc(sizeof *job);
144 pthread_mutex_lock(&mutex);
146 ERROR("out of memory");
151 /* initialises the job */
153 job->timeout = timeout;
154 job->callback = callback;
163 * Adds 'job' at the end of the list of jobs, marking it
164 * as blocked if an other job with the same group is pending.
165 * @param job the job to add
167 static void job_add(struct job *job)
170 struct job *ijob, **pjob;
176 /* search end and blockers */
180 if (group && ijob->group == group)
192 * Get the next job to process or NULL if none.
193 * @return the first job that isn't blocked or NULL
195 static inline struct job *job_get()
197 struct job *job = first_job;
198 while (job && job->blocked)
206 * Releases the processed 'job': removes it
207 * from the list of jobs and unblock the first
208 * pending job of the same group if any.
209 * @param job the job to release
211 static inline void job_release(struct job *job)
213 struct job *ijob, **pjob;
216 /* first unqueue the job */
219 while (ijob != job) {
225 /* then unblock jobs of the same group */
229 while (ijob && ijob->group != group)
235 /* recycle the job */
236 job->next = free_jobs;
241 * Monitored cancel callback for a job.
242 * This function is called by the monitor
243 * to cancel the job when the safe environment
245 * @param signum 0 on normal flow or the number
246 * of the signal that interrupted the normal
248 * @param arg the job to run
250 static void job_cancel(int signum, void *arg)
252 struct job *job = arg;
253 job->callback(SIGABRT, job->arg);
257 * Monitored normal callback for events.
258 * This function is called by the monitor
259 * to run the event loop when the safe environment
261 * @param signum 0 on normal flow or the number
262 * of the signal that interrupted the normal
264 * @param arg the events to run
266 static void evloop_run(int signum, void *arg)
273 rc = sd_event_prepare(se);
276 CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
280 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
283 ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
286 evloop.state = EVLOOP_STATE_RUN;
288 rc = sd_event_dispatch(se);
291 ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
299 * Internal callback for evloop management.
300 * The effect of this function is hidden: it exits
301 * the waiting poll if any.
303 static void evloop_on_efd_event()
306 read(evloop.efd, &x, sizeof x);
310 * wakeup the event loop if needed by sending
313 static void evloop_wakeup()
317 if (evloop.state & EVLOOP_STATE_WAIT) {
319 write(evloop.efd, &x, sizeof x);
324 * Release the currently held event loop
326 static void evloop_release()
328 struct thread *nh, *ct = current_thread;
330 if (ct && evloop.holder == ct) {
334 pthread_cond_signal(nh->cwhold);
339 * get the eventloop for the current thread
341 static int evloop_get()
343 struct thread *ct = current_thread;
346 return evloop.holder == ct;
357 * acquire the eventloop for the current thread
359 static void evloop_acquire()
361 struct thread **pwait, *ct;
364 /* try to get the evloop */
366 /* failed, init waiting state */
370 pthread_cond_init(&cond, NULL);
372 /* queue current thread in holder list */
373 pwait = &evloop.holder;
375 pwait = &(*pwait)->nholder;
378 /* wake up the evloop */
381 /* wait to acquire the evloop */
382 pthread_cond_wait(&cond, &mutex);
383 pthread_cond_destroy(&cond);
389 * @param me the description of the thread to enter
391 static void thread_enter(volatile struct thread *me)
394 /* initialize description of itself and link it in the list */
395 me->tid = pthread_self();
398 me->upper = current_thread;
400 threads = (struct thread*)me;
401 current_thread = (struct thread*)me;
406 * @param me the description of the thread to leave
408 static void thread_leave()
410 struct thread **prv, *me;
412 /* unlink the current thread and cleanup */
419 current_thread = me->upper;
423 * Main processing loop of internal threads with processing jobs.
424 * The loop must be called with the mutex locked
425 * and it returns with the mutex locked.
426 * @param me the description of the thread to use
427 * TODO: how are timeout handled when reentering?
429 static void thread_run_internal(volatile struct thread *me)
436 /* loop until stopped */
438 /* release the current event loop */
444 /* prepare running the job */
445 job->blocked = 1; /* mark job as blocked */
446 me->job = job; /* record the job (only for terminate) */
449 pthread_mutex_unlock(&mutex);
450 sig_monitor(job->timeout, job->callback, job->arg);
451 pthread_mutex_lock(&mutex);
453 /* release the run job */
455 /* no job, check event loop wait */
456 } else if (evloop_get()) {
457 if (evloop.state != 0) {
459 CRITICAL("Can't enter dispatch while in dispatch!");
463 evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
464 pthread_mutex_unlock(&mutex);
465 sig_monitor(0, evloop_run, NULL);
466 pthread_mutex_lock(&mutex);
469 /* no job and no event loop */
472 ERROR("Entering job deep sleep! Check your bindings.");
474 pthread_cond_wait(&cond, &mutex);
485 * Main processing loop of external threads.
486 * The loop must be called with the mutex locked
487 * and it returns with the mutex locked.
488 * @param me the description of the thread to use
490 static void thread_run_external(volatile struct thread *me)
495 /* loop until stopped */
498 pthread_cond_wait(&cond, &mutex);
504 * Root for created threads.
506 static void thread_main()
512 sig_monitor_init_timeouts();
513 thread_run_internal(&me);
514 sig_monitor_clean_timeouts();
520 * Entry point for created threads.
521 * @param data not used
524 static void *thread_starter(void *data)
526 pthread_mutex_lock(&mutex);
528 pthread_mutex_unlock(&mutex);
533 * Starts a new thread
534 * @return 0 in case of success or -1 in case of error
536 static int start_one_thread()
541 rc = pthread_create(&tid, NULL, thread_starter, NULL);
544 WARNING("not able to start thread: %m");
551 * Queues a new asynchronous job represented by 'callback' and 'arg'
552 * for the 'group' and the 'timeout'.
553 * Jobs are queued FIFO and are possibly executed in parallel
554 * concurrently except for job of the same group that are
555 * executed sequentially in FIFO order.
556 * @param group The group of the job or NULL when no group.
557 * @param timeout The maximum execution time in seconds of the job
558 * or 0 for unlimited time.
559 * @param callback The function to execute for achieving the job.
560 * Its first parameter is either 0 on normal flow
561 * or the signal number that broke the normal flow.
562 * The remaining parameter is the parameter 'arg1'
564 * @param arg The second argument for 'callback'
565 * @return 0 in case of success or -1 in case of error
570 void (*callback)(int, void*),
576 pthread_mutex_lock(&mutex);
578 /* allocates the job */
579 job = job_create(group, timeout, callback, arg);
583 /* check availability */
585 ERROR("can't process job with threads: too many jobs");
590 /* start a thread if needed */
591 if (running == started && started < allowed) {
592 /* all threads are busy and a new can be started */
593 rc = start_one_thread();
594 if (rc < 0 && started == 0) {
595 ERROR("can't start initial thread: %m");
603 /* signal an existing job */
604 pthread_cond_signal(&cond);
605 pthread_mutex_unlock(&mutex);
609 job->next = free_jobs;
612 pthread_mutex_unlock(&mutex);
617 * Internal helper function for 'jobs_enter'.
618 * @see jobs_enter, jobs_leave
620 static void enter_cb(int signum, void *closure)
622 struct sync *sync = closure;
623 sync->enter(signum, sync->arg, (void*)&sync->thread);
627 * Internal helper function for 'jobs_call'.
630 static void call_cb(int signum, void *closure)
632 struct sync *sync = closure;
633 sync->callback(signum, sync->arg);
634 jobs_leave((void*)&sync->thread);
638 * Internal helper for synchronous jobs. It enters
639 * a new thread loop for evaluating the given job
640 * as recorded by the couple 'sync_cb' and 'sync'.
641 * @see jobs_call, jobs_enter, jobs_leave
646 void (*sync_cb)(int signum, void *closure),
652 pthread_mutex_lock(&mutex);
654 /* allocates the job */
655 job = job_create(group, timeout, sync_cb, sync);
657 pthread_mutex_unlock(&mutex);
664 /* run until stopped */
666 thread_run_internal(&sync->thread);
668 thread_run_external(&sync->thread);
669 pthread_mutex_unlock(&mutex);
674 * Enter a synchronisation point: activates the job given by 'callback'
675 * and 'closure' using 'group' and 'timeout' to control sequencing and
677 * @param group the group for sequencing jobs
678 * @param timeout the time in seconds allocated to the job
679 * @param callback the callback that will handle the job.
680 * it receives 3 parameters: 'signum' that will be 0
681 * on normal flow or the catched signal number in case
682 * of interrupted flow, the context 'closure' as given and
683 * a 'jobloop' reference that must be used when the job is
684 * terminated to unlock the current execution flow.
685 * @param closure the argument to the callback
686 * @return 0 on success or -1 in case of error
691 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
697 sync.enter = callback;
699 return do_sync(group, timeout, enter_cb, &sync);
703 * Unlocks the execution flow designed by 'jobloop'.
704 * @param jobloop indication of the flow to unlock
705 * @return 0 in case of success of -1 on error
707 int jobs_leave(struct jobloop *jobloop)
711 pthread_mutex_lock(&mutex);
713 while (t && t != (struct thread*)jobloop)
720 pthread_cond_broadcast(&cond);
724 pthread_mutex_unlock(&mutex);
729 * Calls synchronously the job represented by 'callback' and 'arg1'
730 * for the 'group' and the 'timeout' and waits for its completion.
731 * @param group The group of the job or NULL when no group.
732 * @param timeout The maximum execution time in seconds of the job
733 * or 0 for unlimited time.
734 * @param callback The function to execute for achieving the job.
735 * Its first parameter is either 0 on normal flow
736 * or the signal number that broke the normal flow.
737 * The remaining parameter is the parameter 'arg1'
739 * @param arg The second argument for 'callback'
740 * @return 0 in case of success or -1 in case of error
745 void (*callback)(int, void*),
750 sync.callback = callback;
753 return do_sync(group, timeout, call_cb, &sync);
757 * Internal callback for evloop management.
758 * The effect of this function is hidden: it exits
759 * the waiting poll if any. Then it wakes up a thread
760 * awaiting the evloop using signal.
762 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
764 evloop_on_efd_event();
769 * Gets a sd_event item for the current thread.
770 * @return a sd_event or NULL in case of error
772 static struct sd_event *get_sd_event_locked()
776 /* creates the evloop on need */
778 /* start the creation */
780 /* creates the eventfd for waking up polls */
781 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
782 if (evloop.efd < 0) {
783 ERROR("can't make eventfd for events");
786 /* create the systemd event loop */
787 evloop.sdev = systemd_get_event_loop();
789 ERROR("can't make event loop");
792 /* put the eventfd in the event loop */
793 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
795 ERROR("can't register eventfd");
804 /* acquire the event loop */
811 * Ensure that the current running thread can control the event loop.
813 void jobs_acquire_event_manager()
817 /* ensure an existing thread environment */
818 if (!current_thread) {
819 memset(<, 0, sizeof lt);
820 current_thread = <
824 pthread_mutex_lock(&mutex);
825 get_sd_event_locked();
826 pthread_mutex_unlock(&mutex);
828 /* release the faked thread environment if needed */
829 if (current_thread == <) {
831 * Releasing it is needed because there is no way to guess
832 * when it has to be released really. But here is where it is
833 * hazardous: if the caller modifies the eventloop when it
834 * is waiting, there is no way to make the change effective.
835 * A workaround to achieve that goal is for the caller to
836 * require the event loop a second time after having modified it.
838 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
839 if (verbose_wants(Log_Level_Info))
840 sig_monitor_dumpstack();
842 current_thread = NULL;
847 * Enter the jobs processing loop.
848 * @param allowed_count Maximum count of thread for jobs including this one
849 * @param start_count Count of thread to start now, must be lower.
850 * @param waiter_count Maximum count of jobs that can be waiting.
851 * @param start The start routine to activate (can't be NULL)
852 * @return 0 in case of success or -1 in case of error.
854 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
859 assert(allowed_count >= 1);
860 assert(start_count >= 0);
861 assert(waiter_count > 0);
862 assert(start_count <= allowed_count);
865 pthread_mutex_lock(&mutex);
867 /* check whether already running */
868 if (current_thread || allowed) {
869 ERROR("thread already started");
874 /* records the allowed count */
875 allowed = allowed_count;
878 remains = waiter_count;
880 /* start at least one thread: the current one */
882 while (launched < start_count) {
883 if (start_one_thread() != 0) {
884 ERROR("Not all threads can be started");
890 /* queue the start job */
891 job = job_create(NULL, 0, start, arg);
900 pthread_mutex_unlock(&mutex);
905 * Terminate all the threads and cancel all pending jobs.
907 void jobs_terminate()
909 struct job *job, *head, *tail;
910 pthread_t me, *others;
917 /* request all threads to stop */
918 pthread_mutex_lock(&mutex);
921 /* count the number of threads */
925 if (!t->upper && !pthread_equal(t->tid, me))
930 /* fill the array of threads */
931 others = alloca(count * sizeof *others);
935 if (!t->upper && !pthread_equal(t->tid, me))
936 others[count++] = t->tid;
940 /* stops the threads */
947 /* wait the threads */
948 pthread_cond_broadcast(&cond);
949 pthread_mutex_unlock(&mutex);
951 pthread_join(others[--count], NULL);
952 pthread_mutex_lock(&mutex);
954 /* cancel pending jobs of other threads */
964 /* search if job is stacked for current */
966 while (t && t->job != job)
969 /* yes, relink it at end */
977 /* no cancel the job */
978 pthread_mutex_unlock(&mutex);
979 sig_monitor(0, job_cancel, job);
981 pthread_mutex_lock(&mutex);
984 pthread_mutex_unlock(&mutex);