#include "sig-monitor.h"
#include "verbose.h"
-#if defined(REMOVE_SYSTEMD_EVENT)
-#include "fdev-epoll.h"
-#endif
-
#define EVENT_TIMEOUT_TOP ((uint64_t)-1)
#define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
/* event loop */
static struct evloop evloop;
-#if defined(REMOVE_SYSTEMD_EVENT)
-static struct fdev_epoll *fdevepoll;
-static int waitevt;
-#endif
-
/**
* Create a new job with the given parameters
* @param group the group of the job
job = malloc(sizeof *job);
pthread_mutex_lock(&mutex);
if (!job) {
+ ERROR("out of memory");
errno = ENOMEM;
goto end;
}
/* queue the jobs */
*pjob = job;
+ remains--;
}
/**
struct job *job = first_job;
while (job && job->blocked)
job = job->next;
+ if (job)
+ remains++;
return job;
}
job->callback(SIGABRT, job->arg);
}
-#if defined(REMOVE_SYSTEMD_EVENT)
-/**
- * Gets a fdev_epoll item.
- * @return a fdev_epoll or NULL in case of error
- */
-static struct fdev_epoll *get_fdevepoll()
-{
- struct fdev_epoll *result;
-
- result = fdevepoll;
- if (!result)
- result = fdevepoll = fdev_epoll_create();
-
- return result;
-}
-#endif
-
/**
* Monitored normal callback for events.
* This function is called by the monitor
{
struct thread *nh, *ct = current_thread;
- if (evloop.holder == ct) {
+ if (ct && evloop.holder == ct) {
nh = ct->nholder;
evloop.holder = nh;
if (nh)
if (evloop.holder)
return evloop.holder == ct;
+ if (!evloop.sdev)
+ return 0;
+
ct->nholder = NULL;
evloop.holder = ct;
return 1;
}
}
-#if defined(REMOVE_SYSTEMD_EVENT)
/**
- * Monitored normal loop for waiting events.
- * @param signum 0 on normal flow or the number
- * of the signal that interrupted the normal
- * flow
- * @param arg the events to run
+ * Enter the thread
+ * @param me the description of the thread to enter
*/
-static void monitored_wait_and_dispatch(int signum, void *arg)
+static void thread_enter(volatile struct thread *me)
{
- struct fdev_epoll *fdev_epoll = arg;
- if (!signum) {
- fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
- }
+ evloop_release();
+ /* initialize description of itself and link it in the list */
+ me->tid = pthread_self();
+ me->stop = 0;
+ me->waits = 0;
+ me->upper = current_thread;
+ me->next = threads;
+ threads = (struct thread*)me;
+ current_thread = (struct thread*)me;
}
-#endif
/**
- * Main processing loop of threads processing jobs.
+ * leave the thread
+ * @param me the description of the thread to leave
+ */
+static void thread_leave()
+{
+ struct thread **prv, *me;
+
+ /* unlink the current thread and cleanup */
+ me = current_thread;
+ prv = &threads;
+ while (*prv != me)
+ prv = &(*prv)->next;
+ *prv = me->next;
+
+ current_thread = me->upper;
+}
+
+/**
+ * Main processing loop of internal threads with processing jobs.
* The loop must be called with the mutex locked
* and it returns with the mutex locked.
* @param me the description of the thread to use
* TODO: how are timeout handled when reentering?
*/
-static void thread_run(volatile struct thread *me)
+static void thread_run_internal(volatile struct thread *me)
{
- struct thread **prv;
struct job *job;
- /* initialize description of itself and link it in the list */
- me->tid = pthread_self();
- me->stop = 0;
- me->waits = 0;
- me->upper = current_thread;
- if (!current_thread) {
- started++;
- sig_monitor_init_timeouts();
- }
- me->next = threads;
- threads = (struct thread*)me;
- current_thread = (struct thread*)me;
+ /* enter thread */
+ thread_enter(me);
/* loop until stopped */
while (!me->stop) {
job = job_get();
if (job) {
/* prepare running the job */
- remains++; /* increases count of job that can wait */
job->blocked = 1; /* mark job as blocked */
me->job = job; /* record the job (only for terminate) */
/* release the run job */
job_release(job);
-#if !defined(REMOVE_SYSTEMD_EVENT)
-
-
-
/* no job, check event loop wait */
} else if (evloop_get()) {
if (evloop.state != 0) {
pthread_cond_wait(&cond, &mutex);
me->waits = 0;
running++;
-#else
- } else if (waitevt) {
- /* no job and not events */
- running--;
- if (!running)
- ERROR("Entering job deep sleep! Check your bindings.");
- me->waits = 1;
- pthread_cond_wait(&cond, &mutex);
- me->waits = 0;
- running++;
- } else {
- /* wait for events */
- waitevt = 1;
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
- pthread_mutex_lock(&mutex);
- waitevt = 0;
-#endif
}
}
-
- /* release the event loop */
+ /* cleanup */
evloop_release();
+ thread_leave();
+}
- /* unlink the current thread and cleanup */
- prv = &threads;
- while (*prv != me)
- prv = &(*prv)->next;
- *prv = me->next;
- current_thread = me->upper;
- if (!current_thread) {
- sig_monitor_clean_timeouts();
- started--;
- }
+/**
+ * Main processing loop of external threads.
+ * The loop must be called with the mutex locked
+ * and it returns with the mutex locked.
+ * @param me the description of the thread to use
+ */
+static void thread_run_external(volatile struct thread *me)
+{
+ /* enter thread */
+ thread_enter(me);
+
+ /* loop until stopped */
+ me->waits = 1;
+ while (!me->stop)
+ pthread_cond_wait(&cond, &mutex);
+ me->waits = 0;
+ thread_leave();
}
/**
- * Entry point for created threads.
- * @param data not used
- * @return NULL
+ * Root for created threads.
*/
-static void *thread_main(void *data)
+static void thread_main()
{
struct thread me;
- pthread_mutex_lock(&mutex);
running++;
- thread_run(&me);
+ started++;
+ sig_monitor_init_timeouts();
+ thread_run_internal(&me);
+ sig_monitor_clean_timeouts();
+ started--;
running--;
+}
+
+/**
+ * Entry point for created threads.
+ * @param data not used
+ * @return NULL
+ */
+static void *thread_starter(void *data)
+{
+ pthread_mutex_lock(&mutex);
+ thread_main();
pthread_mutex_unlock(&mutex);
return NULL;
}
pthread_t tid;
int rc;
- rc = pthread_create(&tid, NULL, thread_main, NULL);
+ rc = pthread_create(&tid, NULL, thread_starter, NULL);
if (rc != 0) {
/* errno = rc; */
WARNING("not able to start thread: %m");
void (*callback)(int, void*),
void *arg)
{
- const char *info;
struct job *job;
int rc;
/* allocates the job */
job = job_create(group, timeout, callback, arg);
- if (!job) {
- errno = ENOMEM;
- info = "out of memory";
+ if (!job)
goto error;
- }
/* check availability */
- if (remains == 0) {
+ if (remains <= 0) {
+ ERROR("can't process job with threads: too many jobs");
errno = EBUSY;
- info = "too many jobs";
goto error2;
}
/* all threads are busy and a new can be started */
rc = start_one_thread();
if (rc < 0 && started == 0) {
- info = "can't start first thread";
+ ERROR("can't start initial thread: %m");
goto error2;
}
}
/* queues the job */
- remains--;
job_add(job);
/* signal an existing job */
job->next = free_jobs;
free_jobs = job;
error:
- ERROR("can't process job with threads: %s, %m", info);
pthread_mutex_unlock(&mutex);
return -1;
}
/* allocates the job */
job = job_create(group, timeout, sync_cb, sync);
if (!job) {
- ERROR("out of memory");
- errno = ENOMEM;
pthread_mutex_unlock(&mutex);
return -1;
}
job_add(job);
/* run until stopped */
- thread_run(&sync->thread);
+ if (current_thread)
+ thread_run_internal(&sync->thread);
+ else
+ thread_run_external(&sync->thread);
pthread_mutex_unlock(&mutex);
return 0;
}
return 1;
}
-/* temporary hack */
-#if !defined(REMOVE_SYSTEMD_EVENT)
-__attribute__((unused))
-#endif
-static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
-{
- sig_monitor(0, evloop_run, arg);
-}
-
/**
* Gets a sd_event item for the current thread.
* @return a sd_event or NULL in case of error
rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
if (rc < 0) {
ERROR("can't register eventfd");
-#if !defined(REMOVE_SYSTEMD_EVENT)
sd_event_unref(evloop.sdev);
evloop.sdev = NULL;
error2:
error1:
return NULL;
}
-#else
- goto error3;
- }
- /* handle the event loop */
- evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
- if (!evloop.fdev) {
- ERROR("can't create fdev");
-error3:
- sd_event_unref(evloop.sdev);
-error2:
- close(evloop.efd);
-error1:
- memset(&evloop, 0, sizeof evloop);
- return NULL;
- }
- fdev_set_autoclose(evloop.fdev, 0);
- fdev_set_events(evloop.fdev, EPOLLIN);
- fdev_set_callback(evloop.fdev, evloop_callback, NULL);
-#endif
}
/* acquire the event loop */
return result;
}
-#if defined(REMOVE_SYSTEMD_EVENT)
-/**
- * Gets the fdev_epoll item.
- * @return a fdev_epoll or NULL in case of error
- */
-struct fdev_epoll *jobs_get_fdev_epoll()
-{
- struct fdev_epoll *result;
-
- pthread_mutex_lock(&mutex);
- result = get_fdevepoll();
- pthread_mutex_unlock(&mutex);
-
- return result;
-}
-#endif
-
/**
* Enter the jobs processing loop.
* @param allowed_count Maximum count of thread for jobs including this one
int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
{
int rc, launched;
- struct thread me;
struct job *job;
assert(allowed_count >= 1);
sd_event_set_watchdog(get_sd_event_locked(), 1);
#endif
- /* start at least one thread */
- launched = 0;
- while ((launched + 1) < start_count) {
+ /* start at least one thread: the current one */
+ launched = 1;
+ while (launched < start_count) {
if (start_one_thread() != 0) {
ERROR("Not all threads can be started");
goto error;
/* queue the start job */
job = job_create(NULL, 0, start, arg);
- if (!job) {
- ERROR("out of memory");
- errno = ENOMEM;
+ if (!job)
goto error;
- }
job_add(job);
- remains--;
/* run until end */
- running++;
- thread_run(&me);
- running--;
+ thread_main();
rc = 0;
error:
pthread_mutex_unlock(&mutex);