struct events *next;
struct sd_event *event;
uint64_t timeout;
- unsigned used: 1;
- unsigned runs: 1;
+ enum {
+ Available,
+ Modifiable,
+ Locked
+ } state;
};
/** Description of threads */
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
struct job *job; /**< currently processed job */
- struct events *events; /**< currently processed job */
pthread_t tid; /**< the thread id */
unsigned stop: 1; /**< stop requested */
- unsigned lowered: 1; /**< has a lower same thread */
unsigned waits: 1; /**< is waiting? */
};
/* list of threads */
static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
/* queue of pending jobs */
static struct job *first_job;
static inline struct events *events_get()
{
struct events *events = first_events;
- while (events && events->used)
+ while (events && events->state != Available)
events = events->next;
return events;
}
*/
static void events_call(int signum, void *arg)
{
+ int rc;
+ struct sd_event *se;
struct events *events = arg;
- if (!signum)
- sd_event_run(events->event, events->timeout);
+
+ if (!signum) {
+ se = events->event;
+ rc = sd_event_prepare(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+ } else {
+ if (rc == 0) {
+ rc = sd_event_wait(se, events->timeout);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+
+ if (rc > 0) {
+ rc = sd_event_dispatch(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+ }
+ }
}
/**
*/
static void thread_run(volatile struct thread *me)
{
- struct thread **prv, *thr;
+ struct thread **prv;
struct job *job;
struct events *events;
uint64_t evto;
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
me->stop = 0;
- me->lowered = 0;
me->waits = 0;
- me->upper = current;
- if (current) {
- current->lowered = 1;
+ me->upper = current_thread;
+ if (current_thread) {
evto = EVENT_TIMEOUT_CHILD;
- me->events = current->events;
} else {
started++;
sig_monitor_init_timeouts();
evto = EVENT_TIMEOUT_TOP;
- me->events = NULL;
}
me->next = threads;
threads = (struct thread*)me;
- current = (struct thread*)me;
+ current_thread = (struct thread*)me;
/* loop until stopped */
while (!me->stop) {
sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
- /* release the run job */
- job_release(job);
-
/* release event if any */
- events = me->events;
- if (events) {
- events->used = 0;
- me->events = NULL;
+ events = current_events;
+ if (events && events->state == Modifiable) {
+ current_events = NULL;
+ events->state = Available;
}
+
+ /* release the run job */
+ job_release(job);
} else {
/* no job, check events */
- events = me->events;
- if (!events || events->runs)
+ events = current_events;
+ if (!events)
events = events_get();
+ else if (events->state == Locked) {
+ events = 0;
+ WARNING("Loosing an event loop because reentering");
+ }
if (events) {
/* run the events */
- events->used = 1;
- events->runs = 1;
+ events->state = Locked;
events->timeout = evto;
- me->events = events;
+ current_events = events;
pthread_mutex_unlock(&mutex);
sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
- events->used = 0;
- events->runs = 0;
- me->events = NULL;
- thr = me->upper;
- while (thr && thr->events == events) {
- thr->events = NULL;
- thr = thr->upper;
- }
+ current_events = NULL;
+ events->state = Available;
} else {
/* no job and not events */
waiting++;
while (*prv != me)
prv = &(*prv)->next;
*prv = me->next;
- current = me->upper;
- if (current) {
- current->lowered = 0;
- } else {
+ current_thread = me->upper;
+ if (!current_thread) {
sig_monitor_clean_timeouts();
started--;
}
struct sd_event *jobs_get_sd_event()
{
struct events *events;
- struct thread *me;
int rc;
pthread_mutex_lock(&mutex);
/* search events on stack */
- me = current;
- while (me && !me->events)
- me = me->upper;
- if (me)
- /* return the stacked events */
- events = me->events;
- else {
+ events = current_events;
+ if (!events) {
/* search an available events */
events = events_get();
if (!events) {
events = malloc(sizeof *events);
if (events && (rc = sd_event_new(&events->event)) >= 0) {
if (nevents < started || start_one_thread() >= 0) {
- events->used = 0;
- events->runs = 0;
+ events->state = Available;
events->next = first_events;
first_events = events;
} else {
}
}
if (events) {
- me = current;
- if (me) {
- events->used = 1;
- me->events = events;
- } else {
+ events->state = Modifiable;
+ if (!current_thread)
WARNING("event returned for unknown thread!");
- }
+ current_events = events;
}
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
/* check whether already running */
- if (current || allowed) {
+ if (current_thread || allowed) {
ERROR("thread already started");
errno = EINVAL;
goto error;
head = job->next;
/* search if job is stacked for current */
- t = current;
+ t = current_thread;
while (t && t->job != job)
t = t->upper;
if (t) {