struct job
{
struct job *next; /**< link to the next job enqueued */
- void *group; /**< group of the request */
+ const void *group; /**< group of the request */
job_cb_t callback; /**< processing callback */
void *arg; /**< argument */
int timeout; /**< timeout in second for processing the request */
struct events *next;
struct sd_event *event;
uint64_t timeout;
- unsigned used: 1;
- unsigned runs: 1;
+ enum {
+ Available,
+ Modifiable,
+ Locked
+ } state;
};
/** Description of threads */
struct thread *next; /**< next thread of the list */
struct thread *upper; /**< upper same thread */
struct job *job; /**< currently processed job */
- struct events *events; /**< currently processed job */
pthread_t tid; /**< the thread id */
unsigned stop: 1; /**< stop requested */
- unsigned lowered: 1; /**< has a lower same thread */
unsigned waits: 1; /**< is waiting? */
};
/* list of threads */
static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
/* queue of pending jobs */
static struct job *first_job;
* @return the created job unblock or NULL when no more memory
*/
static struct job *job_create(
- void *group,
+ const void *group,
int timeout,
job_cb_t callback,
void *arg)
*/
static void job_add(struct job *job)
{
- void *group;
+ const void *group;
struct job *ijob, **pjob;
/* prepare to add */
static inline struct events *events_get()
{
struct events *events = first_events;
- while (events && events->used)
+ while (events && events->state != Available)
events = events->next;
return events;
}
static inline void job_release(struct job *job)
{
struct job *ijob, **pjob;
- void *group;
+ const void *group;
/* first unqueue the job */
pjob = &first_job;
*/
static void events_call(int signum, void *arg)
{
+ int rc;
+ struct sd_event *se;
struct events *events = arg;
- if (!signum)
- sd_event_run(events->event, events->timeout);
+
+ if (!signum) {
+ se = events->event;
+ rc = sd_event_prepare(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+ } else {
+ if (rc == 0) {
+ rc = sd_event_wait(se, events->timeout);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+
+ if (rc > 0) {
+ rc = sd_event_dispatch(se);
+ if (rc < 0) {
+ errno = -rc;
+ ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+ }
+ }
+ }
+ }
}
/**
*/
static void thread_run(volatile struct thread *me)
{
- struct thread **prv, *thr;
+ struct thread **prv;
struct job *job;
struct events *events;
uint64_t evto;
/* initialize description of itself and link it in the list */
me->tid = pthread_self();
me->stop = 0;
- me->lowered = 0;
me->waits = 0;
- me->upper = current;
- if (current) {
- current->lowered = 1;
+ me->upper = current_thread;
+ if (current_thread) {
evto = EVENT_TIMEOUT_CHILD;
} else {
started++;
}
me->next = threads;
threads = (struct thread*)me;
- current = (struct thread*)me;
+ current_thread = (struct thread*)me;
/* loop until stopped */
- me->events = NULL;
while (!me->stop) {
/* get a job */
job = job_get(first_job);
sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
- /* release the run job */
- job_release(job);
-
/* release event if any */
- events = me->events;
- if (events) {
- events->used = 0;
- me->events = NULL;
+ events = current_events;
+ if (events && events->state == Modifiable) {
+ current_events = NULL;
+ events->state = Available;
}
+
+ /* release the run job */
+ job_release(job);
} else {
/* no job, check events */
- thr = (struct thread*)me;
- events = NULL;
- while (thr && !(events = thr->events))
- thr = thr->upper;
- if (events && !events->runs) {
+ events = current_events;
+ if (!events)
+ events = events_get();
+ else if (events->state == Locked) {
+ events = 0;
+ WARNING("Loosing an event loop because reentering");
+ }
+ if (events) {
/* run the events */
- events->runs = 1;
+ events->state = Locked;
events->timeout = evto;
- me->events = events;
+ current_events = events;
pthread_mutex_unlock(&mutex);
sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
- events->runs = 0;
- me->events = NULL;
+ current_events = NULL;
+ events->state = Available;
} else {
- /* no owned event, check events */
- events = events_get();
- if (events) {
- /* run the events */
- events->used = 1;
- events->runs = 1;
- events->timeout = evto;
- me->events = events;
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, events_call, events);
- pthread_mutex_lock(&mutex);
- events->used = 0;
- events->runs = 0;
- me->events = NULL;
- } else {
- /* no job and not events */
- waiting++;
- me->waits = 1;
- pthread_cond_wait(&cond, &mutex);
- me->waits = 0;
- waiting--;
- }
+ /* no job and not events */
+ waiting++;
+ me->waits = 1;
+ pthread_cond_wait(&cond, &mutex);
+ me->waits = 0;
+ waiting--;
}
}
}
while (*prv != me)
prv = &(*prv)->next;
*prv = me->next;
- current = me->upper;
- if (current) {
- current->lowered = 0;
- } else {
+ current_thread = me->upper;
+ if (!current_thread) {
sig_monitor_clean_timeouts();
started--;
}
* @return 0 in case of success or -1 in case of error
*/
int jobs_queue(
- void *group,
+ const void *group,
int timeout,
void (*callback)(int, void*),
void *arg)
* @see jobs_call, jobs_enter, jobs_leave
*/
static int do_sync(
- void *group,
+ const void *group,
int timeout,
void (*sync_cb)(int signum, void *closure),
struct sync *sync
* @return 0 on success or -1 in case of error
*/
int jobs_enter(
- void *group,
+ const void *group,
int timeout,
void (*callback)(int signum, void *closure, struct jobloop *jobloop),
void *closure
* @return 0 in case of success or -1 in case of error
*/
int jobs_call(
- void *group,
+ const void *group,
int timeout,
void (*callback)(int, void*),
void *arg)
struct sd_event *jobs_get_sd_event()
{
struct events *events;
- struct thread *me;
int rc;
pthread_mutex_lock(&mutex);
/* search events on stack */
- me = current;
- while (me && !me->events)
- me = me->upper;
- if (me)
- /* return the stacked events */
- events = me->events;
- else {
+ events = current_events;
+ if (!events) {
/* search an available events */
events = events_get();
if (!events) {
events = malloc(sizeof *events);
if (events && (rc = sd_event_new(&events->event)) >= 0) {
if (nevents < started || start_one_thread() >= 0) {
- events->used = 0;
- events->runs = 0;
+ events->state = Available;
events->next = first_events;
first_events = events;
} else {
ERROR("creation of sd_event failed: %m");
events = NULL;
errno = -rc;
- }
+ }
}
}
}
if (events) {
- me = current;
- if (me) {
- events->used = 1;
- me->events = events;
- } else {
+ events->state = Modifiable;
+ if (!current_thread)
WARNING("event returned for unknown thread!");
- }
+ current_events = events;
}
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
/* check whether already running */
- if (current || allowed) {
+ if (current_thread || allowed) {
ERROR("thread already started");
errno = EINVAL;
goto error;
head = job->next;
/* search if job is stacked for current */
- t = current;
+ t = current_thread;
while (t && t->job != job)
t = t->upper;
if (t) {