#define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
/** Internal shortcut for callback */
-typedef void (*job_cb_t)(int, void*, void *, void*);
+typedef void (*job_cb_t)(int, void*);
/** Description of a pending job */
struct job
{
struct job *next; /**< link to the next job enqueued */
- void *group; /**< group of the request */
+ const void *group; /**< group of the request */
job_cb_t callback; /**< processing callback */
- void *arg1; /**< first arg */
- void *arg2; /**< second arg */
- void *arg3; /**< third arg */
+ void *arg; /**< argument */
int timeout; /**< timeout in second for processing the request */
unsigned blocked: 1; /**< is an other request blocking this one ? */
unsigned dropped: 1; /**< is removed ? */
struct events *next;
struct sd_event *event;
uint64_t timeout;
+ unsigned used: 1;
unsigned runs: 1;
};
*/
struct sync
{
- void (*callback)(int, void*); /**< the synchrnous callback */
- void *arg; /**< the argument of the callback */
+ struct thread thread; /**< thread loop data */
+ union {
+ void (*callback)(int, void*); /**< the synchronous callback */
+ void (*enter)(int signum, void *closure, struct jobloop *jobloop);
+ /**< the entering synchronous routine */
+ };
+ void *arg; /**< the argument of the callback */
};
+
/* synchronisation of threads */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
* @param group the group of the job
* @param timeout the timeout of the job (0 if none)
* @param callback the function that achieves the job
- * @param arg1 the first argument of the callback
- * @param arg2 the second argument of the callback
- * @param arg3 the third argument of the callback
+ * @param arg the argument of the callback
* @return the created job unblock or NULL when no more memory
*/
static struct job *job_create(
- void *group,
+ const void *group,
int timeout,
job_cb_t callback,
- void *arg1,
- void *arg2,
- void *arg3)
+ void *arg)
{
struct job *job;
job->group = group;
job->timeout = timeout;
job->callback = callback;
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
+ job->arg = arg;
job->blocked = 0;
job->dropped = 0;
end:
*/
static void job_add(struct job *job)
{
- void *group;
+ const void *group;
struct job *ijob, **pjob;
/* prepare to add */
static inline struct events *events_get()
{
struct events *events = first_events;
- while (events && events->runs)
+ while (events && events->used)
events = events->next;
return events;
}
static inline void job_release(struct job *job)
{
struct job *ijob, **pjob;
- void *group;
+ const void *group;
/* first unqueue the job */
pjob = &first_job;
free_jobs = job;
}
-/**
- * Monitored normal callback for a job.
- * This function is called by the monitor
- * to run the job when the safe environment
- * is set.
- * @param signum 0 on normal flow or the number
- * of the signal that interrupted the normal
- * flow
- * @param arg the job to run
- */
-static void job_call(int signum, void *arg)
-{
- struct job *job = arg;
- job->callback(signum, job->arg1, job->arg2, job->arg3);
-}
-
/**
* Monitored cancel callback for a job.
* This function is called by the monitor
*/
static void job_cancel(int signum, void *arg)
{
- job_call(SIGABRT, arg);
+ struct job *job = arg;
+ job->callback(SIGABRT, job->arg);
}
/**
*/
static void thread_run(volatile struct thread *me)
{
- struct thread **prv;
+ struct thread **prv, *thr;
struct job *job;
struct events *events;
uint64_t evto;
if (current) {
current->lowered = 1;
evto = EVENT_TIMEOUT_CHILD;
+ me->events = current->events;
} else {
started++;
sig_monitor_init_timeouts();
evto = EVENT_TIMEOUT_TOP;
+ me->events = NULL;
}
me->next = threads;
threads = (struct thread*)me;
current = (struct thread*)me;
/* loop until stopped */
- me->events = NULL;
while (!me->stop) {
/* get a job */
job = job_get(first_job);
/* run the job */
pthread_mutex_unlock(&mutex);
- sig_monitor(job->timeout, job_call, job);
+ sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
/* release the run job */
/* release event if any */
events = me->events;
if (events) {
- events->runs = 0;
+ events->used = 0;
me->events = NULL;
}
} else {
/* no job, check events */
- events = events_get();
+ events = me->events;
+ if (!events || events->runs)
+ events = events_get();
if (events) {
/* run the events */
+ events->used = 1;
events->runs = 1;
events->timeout = evto;
me->events = events;
pthread_mutex_unlock(&mutex);
sig_monitor(0, events_call, events);
pthread_mutex_lock(&mutex);
+ events->used = 0;
events->runs = 0;
me->events = NULL;
+ thr = me->upper;
+ while (thr && thr->events == events) {
+ thr->events = NULL;
+ thr = thr->upper;
+ }
} else {
/* no job and not events */
waiting++;
}
/**
- * Queues a new asynchronous job represented by 'callback'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue0(
- void *group,
- int timeout,
- void (*callback)(int signum))
-{
- return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg1'
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
* for the 'group' and the 'timeout'.
* Jobs are queued FIFO and are possibly executed in parallel
* concurrently except for job of the same group that are
* @return 0 in case of success or -1 in case of error
*/
int jobs_queue(
- void *group,
+ const void *group,
int timeout,
void (*callback)(int, void*),
void *arg)
-{
- return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * The remaining parameters are the parameters 'arg[12]'
- * given here.
- * @param arg1 The second argument for 'callback'
- * @param arg2 The third argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue2(
- void *group,
- int timeout,
- void (*callback)(int, void*, void*),
- void *arg1,
- void *arg2)
-{
- return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * The remaining parameters are the parameters 'arg[123]'
- * given here.
- * @param arg1 The second argument for 'callback'
- * @param arg2 The third argument for 'callback'
- * @param arg3 The forth argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue3(
- void *group,
- int timeout,
- void (*callback)(int, void*, void *, void*),
- void *arg1,
- void *arg2,
- void *arg3)
{
const char *info;
struct job *job;
pthread_mutex_lock(&mutex);
/* allocates the job */
- job = job_create(group, timeout, callback, arg1, arg2, arg3);
+ job = job_create(group, timeout, callback, arg);
if (!job) {
errno = ENOMEM;
info = "out of memory";
}
/**
- * Enter a synchronisation point: activates the job given by 'callback'
- * and 'closure' using 'group' and 'timeout' to control sequencing and
- * execution time.
- * @param group the group for sequencing jobs
- * @param timeout the time in seconds allocated to the job
- * @param callback the callback that will handle the job.
- * it receives 3 parameters: 'signum' that will be 0
- * on normal flow or the catched signal number in case
- * of interrupted flow, the context 'closure' as given and
- * a 'jobloop' reference that must be used when the job is
- * terminated to unlock the current execution flow.
- * @param closure the context completion closure for the callback
- * @return 0 on success or -1 in case of error
+ * Internal helper function for 'jobs_enter'.
+ * @see jobs_enter, jobs_leave
*/
-int jobs_enter(
- void *group,
+static void enter_cb(int signum, void *closure)
+{
+ struct sync *sync = closure;
+ sync->enter(signum, sync->arg, (void*)&sync->thread);
+}
+
+/**
+ * Internal helper function for 'jobs_call'.
+ * @see jobs_call
+ */
+static void call_cb(int signum, void *closure)
+{
+ struct sync *sync = closure;
+ sync->callback(signum, sync->arg);
+ jobs_leave((void*)&sync->thread);
+}
+
+/**
+ * Internal helper for synchronous jobs. It enters
+ * a new thread loop for evaluating the given job
+ * as recorded by the couple 'sync_cb' and 'sync'.
+ * @see jobs_call, jobs_enter, jobs_leave
+ */
+static int do_sync(
+ const void *group,
int timeout,
- void (*callback)(int signum, void *closure, struct jobloop *jobloop),
- void *closure
+ void (*sync_cb)(int signum, void *closure),
+ struct sync *sync
)
{
-
struct job *job;
- struct thread me;
pthread_mutex_lock(&mutex);
/* allocates the job */
- job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
+ job = job_create(group, timeout, sync_cb, sync);
if (!job) {
ERROR("out of memory");
errno = ENOMEM;
job_add(job);
/* run until stopped */
- thread_run(&me);
+ thread_run(&sync->thread);
pthread_mutex_unlock(&mutex);
return 0;
}
+/**
+ * Enter a synchronisation point: activates the job given by 'callback'
+ * and 'closure' using 'group' and 'timeout' to control sequencing and
+ * execution time.
+ * @param group the group for sequencing jobs
+ * @param timeout the time in seconds allocated to the job
+ * @param callback the callback that will handle the job.
+ * it receives 3 parameters: 'signum' that will be 0
+ * on normal flow or the catched signal number in case
+ * of interrupted flow, the context 'closure' as given and
+ * a 'jobloop' reference that must be used when the job is
+ * terminated to unlock the current execution flow.
+ * @param arg the argument to the callback
+ * @return 0 on success or -1 in case of error
+ */
+int jobs_enter(
+ const void *group,
+ int timeout,
+ void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+ void *closure
+)
+{
+ struct sync sync;
+
+ sync.enter = callback;
+ sync.arg = closure;
+ return do_sync(group, timeout, enter_cb, &sync);
+}
+
/**
* Unlocks the execution flow designed by 'jobloop'.
* @param jobloop indication of the flow to unlock
return -!t;
}
-/**
- * Internal helper function for 'jobs_call'.
- * @see jobs_call, jobs_enter, jobs_leave
- */
-static void call_cb(int signum, void *closure, struct jobloop *jobloop)
-{
- struct sync *sync = closure;
- sync->callback(signum, sync->arg);
- jobs_leave(jobloop);
-}
-
/**
* Calls synchronously the job represented by 'callback' and 'arg1'
* for the 'group' and the 'timeout' and waits for its completion.
* @return 0 in case of success or -1 in case of error
*/
int jobs_call(
- void *group,
+ const void *group,
int timeout,
void (*callback)(int, void*),
void *arg)
sync.callback = callback;
sync.arg = arg;
- return jobs_enter(group, timeout, call_cb, &sync);
+
+ return do_sync(group, timeout, call_cb, &sync);
}
/**
events = malloc(sizeof *events);
if (events && (rc = sd_event_new(&events->event)) >= 0) {
if (nevents < started || start_one_thread() >= 0) {
+ events->used = 0;
events->runs = 0;
events->next = first_events;
first_events = events;
ERROR("creation of sd_event failed: %m");
events = NULL;
errno = -rc;
- }
+ }
}
}
}
if (events) {
- /* */
me = current;
if (me) {
- events->runs = 1;
+ events->used = 1;
me->events = events;
} else {
WARNING("event returned for unknown thread!");
* @param start The start routine to activate (can't be NULL)
* @return 0 in case of success or -1 in case of error.
*/
-int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
{
int rc, launched;
struct thread me;
}
/* queue the start job */
- job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
+ job = job_create(NULL, 0, (job_cb_t)start, NULL);
if (!job) {
ERROR("out of memory");
errno = ENOMEM;