X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-thread.c;h=67870ce6b051168ce66de31522df33114db9346e;hb=feccdb76f572a5fad947475c21b5b9aff696b04b;hp=5ce093005db6a153519ffcea359c2488d0043ed3;hpb=98a5bca16007a7c4740c4326ef83768d034aed3e;p=src%2Fapp-framework-binder.git diff --git a/src/afb-thread.c b/src/afb-thread.c index 5ce09300..67870ce6 100644 --- a/src/afb-thread.c +++ b/src/afb-thread.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 "IoT.bzh" + * Copyright (C) 2016, 2017 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,362 +17,44 @@ #define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include +#include #include #include "afb-thread.h" -#include "afb-sig-handler.h" +#include "jobs.h" +#include "sig-monitor.h" #include "verbose.h" -/* control of threads */ -struct thread +static void req_call(int signum, void *arg1, void *arg2, void *arg3) { - pthread_t tid; /* the thread id */ - unsigned stop: 1; /* stop request */ - unsigned ended: 1; /* ended status */ - unsigned works: 1; /* is it processing a job? */ -}; + struct afb_req req = { .itf = arg1, .closure = arg2 }; + void (*callback)(struct afb_req) = arg3; -/* describes pending job */ -struct job -{ - void (*callback)(struct afb_req req); /* processing callback */ - struct afb_req req; /* request to be processed */ - int timeout; /* timeout in second for processing the request */ - int blocked; /* is an other request blocking this one ? */ - void *group; /* group of the request */ - struct job *next; /* link to the next job enqueued */ -}; - -/* synchronisation of threads */ -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - -/* queue of pending jobs */ -static struct job *first_job = NULL; - -/* count allowed, started and running threads */ -static int allowed = 0; -static int started = 0; -static int running = 0; -static int remains = 0; - -/* list of threads */ -static struct thread *threads = NULL; - -/* local timers */ -static _Thread_local int thread_timer_set; -static _Thread_local timer_t thread_timerid; - -/* - * Creates a timer for the current thread - * - * Returns 0 in case of success - */ -int afb_thread_timer_create() -{ - int rc; - struct sigevent sevp; - - if (thread_timer_set) - rc = 0; - else { - sevp.sigev_notify = SIGEV_THREAD_ID; - sevp.sigev_signo = SIGALRM; - sevp.sigev_value.sival_ptr = NULL; -#if defined(sigev_notify_thread_id) - sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid); -#else - sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid); -#endif - rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid); - thread_timer_set = !rc; - } - return 0; + if (signum != 0) + afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum); + else + callback(req); + afb_req_unref(req); } -/* - * Arms the alarm in timeout seconds for the current thread - */ -int afb_thread_timer_arm(int timeout) +void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group) { int rc; - struct itimerspec its; - - rc = afb_thread_timer_create(); - if (rc == 0) { - its.it_interval.tv_sec = 0; - its.it_interval.tv_nsec = 0; - its.it_value.tv_sec = timeout; - its.it_value.tv_nsec = 0; - rc = timer_settime(thread_timerid, 0, &its, NULL); - } - return rc; -} - -/* - * Disarms the current alarm - */ -void afb_thread_timer_disarm() -{ - if (thread_timer_set) - afb_thread_timer_arm(0); -} - -/* - * Delstroy any alarm resource for the current thread - */ -void afb_thread_timer_delete() -{ - if (thread_timer_set) { - timer_delete(thread_timerid); - thread_timer_set = 0; - } -} - -/* add the job to the list */ -static inline void job_add(struct job *job) -{ - void *group = job->group; - struct job *ijob, **pjob; - - pjob = &first_job; - ijob = first_job; - group = job->group; - if (group == NULL) - group = job; - while (ijob) { - if (ijob->group == group) - job->blocked = 1; - pjob = &ijob->next; - ijob = ijob->next; - } - *pjob = job; - job->next = NULL; - remains--; -} - -/* get the next job to process or NULL if none */ -static inline struct job *job_get() -{ - struct job *job, **pjob; - pjob = &first_job; - job = first_job; - while (job && job->blocked) { - pjob = &job->next; - job = job->next; - } - if (job) { - *pjob = job->next; - remains++; - } - return job; -} - -/* unblock a group of job */ -static inline void job_unblock(void *group) -{ - struct job *job; - - job = first_job; - while (job) { - if (job->group == group) { - job->blocked = 0; - break; - } - job = job->next; - } -} - -/* main loop of processing threads */ -static void *thread_main_loop(void *data) -{ - struct thread *me = data; - struct job *job, j; - - me->works = 0; - me->ended = 0; - afb_thread_timer_create(); - pthread_mutex_lock(&mutex); - while (!me->stop) { - /* get a job */ - job = job_get(); - if (job == NULL && first_job != NULL && running == 0) { - /* sad situation!! should not happen */ - ERROR("threads are blocked!"); - job = first_job; - first_job = job->next; - } - if (job == NULL) { - /* no job... */ - pthread_cond_wait(&cond, &mutex); - } else { - /* run the job */ - running++; - me->works = 1; - pthread_mutex_unlock(&mutex); - j = *job; - free(job); - afb_thread_timer_arm(j.timeout); - afb_sig_req(j.req, j.callback); - afb_thread_timer_disarm(); - afb_req_unref(j.req); - pthread_mutex_lock(&mutex); - if (j.group != NULL) - job_unblock(j.group); - me->works = 0; - running--; - } - - } - me->ended = 1; - pthread_mutex_unlock(&mutex); - afb_thread_timer_delete(); - return me; -} - -/* start a new thread */ -static int start_one_thread() -{ - struct thread *t; - int rc; - - assert(started < allowed); - - t = &threads[started++]; - t->stop = 0; - rc = pthread_create(&t->tid, NULL, thread_main_loop, t); - if (rc != 0) { - started--; - errno = rc; - WARNING("not able to start thread: %m"); - rc = -1; - } - return rc; -} - -/* process the 'request' with the 'callback' using a separate thread if available */ -void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group) -{ - const char *info; - struct job *job; - int rc; - - /* allocates the job */ - job = malloc(sizeof *job); - if (job == NULL) { - info = "out of memory"; - goto error; - } - - /* start a thread if needed */ - pthread_mutex_lock(&mutex); - if (remains == 0) { - info = "too many jobs"; - goto error2; - } - if (started == running && started < allowed) { - rc = start_one_thread(); - if (rc < 0 && started == 0) { - /* failed to start threading */ - info = "can't start thread"; - goto error2; - } - } - - /* fills and queues the job */ - job->callback = callback; - job->req = req; - job->timeout = timeout; - job->group = group; afb_req_addref(req); - job_add(job); - pthread_mutex_unlock(&mutex); - - /* signal an existing job */ - pthread_cond_signal(&cond); - return; - -error2: - pthread_mutex_unlock(&mutex); - free(job); -error: - ERROR("can't process job with threads: %s", info); - afb_req_fail(req, "internal-error", info); -} - -/* initialise the threads */ -int afb_thread_init(int allowed_count, int start_count, int waiter_count) -{ - threads = calloc(allowed_count, sizeof *threads); - if (threads == NULL) { - errno = ENOMEM; - ERROR("can't allocate threads"); - return -1; - } - - /* records the allowed count */ - allowed = allowed_count; - started = 0; - running = 0; - remains = waiter_count; - - /* start at least one thread */ - pthread_mutex_lock(&mutex); - while (started < start_count && start_one_thread() == 0); - pthread_mutex_unlock(&mutex); - - /* end */ - return -(started != start_count); -} - -/* terminate all the threads and all pending requests */ -void afb_thread_terminate() -{ - int i, n; - struct job *job; - - /* request all threads to stop */ - pthread_mutex_lock(&mutex); - allowed = 0; - n = started; - for (i = 0 ; i < n ; i++) - threads[i].stop = 1; - - /* wait until all thread are terminated */ - while (started != 0) { - /* signal threads */ - pthread_mutex_unlock(&mutex); - pthread_cond_broadcast(&cond); - pthread_mutex_lock(&mutex); - - /* join the terminated threads */ - for (i = 0 ; i < n ; i++) { - if (threads[i].tid && threads[i].ended) { - pthread_join(threads[i].tid, NULL); - threads[i].tid = 0; - started--; - } + if (0) { + /* no threading */ + sig_monitor3(timeout, req_call, (void*)req.itf, req.closure, callback); + } else { + /* threading */ + rc = jobs_queue3(group, timeout, req_call, (void*)req.itf, req.closure, callback); + if (rc < 0) { + /* TODO: allows or not to proccess it directly as when no threading? (see above) */ + ERROR("can't process job with threads: %m"); + afb_req_fail_f(req, "cancelled", "not able to pipe a job for the task"); + afb_req_unref(req); } } - pthread_mutex_unlock(&mutex); - free(threads); - - /* cancel pending jobs */ - while (first_job) { - job = first_job; - first_job = job->next; - afb_req_fail(job->req, "aborted", "termination of threading"); - afb_req_unref(job->req); - free(job); - } } +