- return rc;
-}
-
-/*
- * Disarms the current alarm
- */
-void afb_thread_timer_disarm()
-{
- if (thread_timer_set)
- afb_thread_timer_arm(0);
-}
-
-/*
- * Delstroy any alarm resource for the current thread
- */
-void afb_thread_timer_delete()
-{
- if (thread_timer_set) {
- timer_delete(thread_timerid);
- thread_timer_set = 0;
- }
-}
-
-/* add the job to the list */
-static inline void job_add(struct job *job)
-{
- void *group = job->group;
- struct job *ijob, **pjob;
-
- pjob = &first_job;
- ijob = first_job;
- group = job->group;
- if (group == NULL)
- group = job;
- while (ijob) {
- if (ijob->group == group)
- job->blocked = 1;
- pjob = &ijob->next;
- ijob = ijob->next;
- }
- *pjob = job;
- job->next = NULL;
- remains--;
-}
-
-/* get the next job to process or NULL if none */
-static inline struct job *job_get()
-{
- struct job *job, **pjob;
- pjob = &first_job;
- job = first_job;
- while (job && job->blocked) {
- pjob = &job->next;
- job = job->next;
- }
- if (job) {
- *pjob = job->next;
- remains++;
- }
- return job;
-}
-
-/* unblock a group of job */
-static inline void job_unblock(void *group)
-{
- struct job *job;
-
- job = first_job;
- while (job) {
- if (job->group == group) {
- job->blocked = 0;
- break;
- }
- job = job->next;
- }
-}
-
-/* main loop of processing threads */
-static void *thread_main_loop(void *data)
-{
- struct thread *me = data;
- struct job *job, j;
-
- me->works = 0;
- me->ended = 0;
- afb_thread_timer_create();
- pthread_mutex_lock(&mutex);
- while (!me->stop) {
- /* get a job */
- job = job_get();
- if (job == NULL && first_job != NULL && running == 0) {
- /* sad situation!! should not happen */
- ERROR("threads are blocked!");
- job = first_job;
- first_job = job->next;
- }
- if (job == NULL) {
- /* no job... */
- pthread_cond_wait(&cond, &mutex);
- } else {
- /* run the job */
- running++;
- me->works = 1;
- pthread_mutex_unlock(&mutex);
- j = *job;
- free(job);
- afb_thread_timer_arm(j.timeout);
- afb_sig_req(j.req, j.callback);
- afb_thread_timer_disarm();
- afb_req_unref(j.req);
- pthread_mutex_lock(&mutex);
- if (j.group != NULL)
- job_unblock(j.group);
- me->works = 0;
- running--;
- }
-
- }
- me->ended = 1;
- pthread_mutex_unlock(&mutex);
- afb_thread_timer_delete();
- return me;
-}
-
-/* start a new thread */
-static int start_one_thread()
-{
- struct thread *t;
- int rc;
-
- assert(started < allowed);
-
- t = &threads[started++];
- t->stop = 0;
- rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
- if (rc != 0) {
- started--;
- errno = rc;
- WARNING("not able to start thread: %m");
- rc = -1;
- }
- return rc;
-}
-
-/* process the 'request' with the 'callback' using a separate thread if available */
-void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
-{
- const char *info;
- struct job *job;
- int rc;
-
- /* allocates the job */
- job = malloc(sizeof *job);
- if (job == NULL) {
- info = "out of memory";
- goto error;
- }
-
- /* start a thread if needed */
- pthread_mutex_lock(&mutex);
- if (remains == 0) {
- info = "too many jobs";
- goto error2;
- }
- if (started == running && started < allowed) {
- rc = start_one_thread();
- if (rc < 0 && started == 0) {
- /* failed to start threading */
- info = "can't start thread";
- goto error2;
- }
- }
-
- /* fills and queues the job */
- job->callback = callback;
- job->req = req;
- job->timeout = timeout;
- job->group = group;