From feccdb76f572a5fad947475c21b5b9aff696b04b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Wed, 22 Mar 2017 16:49:53 +0100 Subject: [PATCH] Refactor of threading and signal monitor MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The goal is to allow use of this facilities for things that are not 'afb_req'. Change-Id: I0d99c227934ed45136477bf6235bd1541d5f05cf Signed-off-by: José Bollo --- src/CMakeLists.txt | 5 +- src/afb-api-so.c | 8 +- src/afb-sig-handler.c | 125 ----------- src/afb-thread.c | 367 ++----------------------------- src/afb-thread.h | 10 +- src/jobs.c | 344 +++++++++++++++++++++++++++++ src/jobs.h | 44 ++++ src/main.c | 8 +- src/sig-monitor.c | 198 +++++++++++++++++ src/{afb-sig-handler.h => sig-monitor.h} | 15 +- 10 files changed, 626 insertions(+), 498 deletions(-) delete mode 100644 src/afb-sig-handler.c create mode 100644 src/jobs.c create mode 100644 src/jobs.h create mode 100644 src/sig-monitor.c rename src/{afb-sig-handler.h => sig-monitor.h} (57%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8d0121a5..6cbc6fa3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -69,7 +69,6 @@ ADD_LIBRARY(afb-lib STATIC afb-method.c afb-msg-json.c afb-session.c - afb-sig-handler.c afb-svc.c afb-subcall.c afb-thread.c @@ -78,10 +77,12 @@ ADD_LIBRARY(afb-lib STATIC afb-ws-json1.c afb-ws.c afb-wsj1.c + jobs.c locale-root.c + sd-fds.c + sig-monitor.c verbose.c websock.c - sd-fds.c ) ########################################### diff --git a/src/afb-api-so.c b/src/afb-api-so.c index 74f94f36..222fbbbd 100644 --- a/src/afb-api-so.c +++ b/src/afb-api-so.c @@ -38,7 +38,6 @@ #include "afb-context.h" #include "afb-apis.h" #include "afb-api-so.h" -#include "afb-sig-handler.h" #include "afb-thread.h" #include "afb-evt.h" #include "afb-svc.h" @@ -194,12 +193,7 @@ static void call_cb(void *closure, struct afb_req req, struct afb_context *conte if (!verb->name) afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix); else if (call_check(req, context, verb)) { - if (0) - /* not threaded */ - afb_sig_req_timeout(req, verb->callback, api_timeout); - else - /* threaded */ - afb_thread_call(req, verb->callback, api_timeout, desc); + afb_thread_req_call(req, verb->callback, api_timeout, desc); } } diff --git a/src/afb-sig-handler.c b/src/afb-sig-handler.c deleted file mode 100644 index bbf1531a..00000000 --- a/src/afb-sig-handler.c +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (C) 2015, 2016, 2017 "IoT.bzh" - * Author "Fulup Ar Foll" - * Author José Bollo - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define _GNU_SOURCE - -#include -#include -#include -#include - -#include - -#include "afb-sig-handler.h" -#include "afb-thread.h" -#include "verbose.h" - -static _Thread_local sigjmp_buf *error_handler; - -static void on_signal_terminate (int signum) -{ - ERROR("Terminating signal received %s", strsignal(signum)); - exit(1); -} - -static void on_signal_error(int signum) -{ - sigset_t sigset; - - // unlock signal to allow a new signal to come - if (error_handler != NULL) { - sigemptyset(&sigset); - sigaddset(&sigset, signum); - sigprocmask(SIG_UNBLOCK, &sigset, 0); - longjmp(*error_handler, signum); - } - if (signum == SIGALRM) - return; - ERROR("Unmonitored signal received %s", strsignal(signum)); - exit(2); -} - -static int install(void (*handler)(int), int *signals) -{ - int result = 1; - while(*signals > 0) { - if (signal(*signals, handler) == SIG_ERR) { - ERROR("failed to install signal handler for signal %s", strsignal(*signals)); - result = 0; - } - signals++; - } - return result; -} - -int afb_sig_handler_init() -{ - static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 }; - static int sigterm[] = { SIGINT, SIGABRT, 0 }; - - return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1; -} - -int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req)) -{ - volatile int signum; - sigjmp_buf jmpbuf, *older; - - older = error_handler; - signum = setjmp(jmpbuf); - if (signum != 0) - afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum); - else { - error_handler = &jmpbuf; - callback(req); - } - error_handler = older; - return signum; -} - -int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout) -{ - int rc; - - if (timeout) - afb_thread_timer_arm(timeout); - rc = afb_sig_req(req, callback); - afb_thread_timer_disarm(); - return rc; -} - -void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout) -{ - volatile int signum; - sigjmp_buf jmpbuf, *older; - - older = error_handler; - signum = setjmp(jmpbuf); - if (signum != 0) { - function(signum, closure); - } - else { - error_handler = &jmpbuf; - if (timeout) - afb_thread_timer_arm(timeout); - function(0, closure); - } - afb_thread_timer_disarm(); - error_handler = older; -} - diff --git a/src/afb-thread.c b/src/afb-thread.c index 790b86b0..67870ce6 100644 --- a/src/afb-thread.c +++ b/src/afb-thread.c @@ -17,363 +17,44 @@ #define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include +#include #include #include "afb-thread.h" -#include "afb-sig-handler.h" +#include "jobs.h" +#include "sig-monitor.h" #include "verbose.h" -/* control of threads */ -struct thread +static void req_call(int signum, void *arg1, void *arg2, void *arg3) { - pthread_t tid; /* the thread id */ - unsigned stop: 1; /* stop request */ - unsigned ended: 1; /* ended status */ - unsigned works: 1; /* is it processing a job? */ -}; + struct afb_req req = { .itf = arg1, .closure = arg2 }; + void (*callback)(struct afb_req) = arg3; -/* describes pending job */ -struct job -{ - void (*callback)(struct afb_req req); /* processing callback */ - struct afb_req req; /* request to be processed */ - int timeout; /* timeout in second for processing the request */ - int blocked; /* is an other request blocking this one ? */ - void *group; /* group of the request */ - struct job *next; /* link to the next job enqueued */ -}; - -/* synchronisation of threads */ -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - -/* queue of pending jobs */ -static struct job *first_job = NULL; - -/* count allowed, started and running threads */ -static int allowed = 0; -static int started = 0; -static int running = 0; -static int remains = 0; - -/* list of threads */ -static struct thread *threads = NULL; - -/* local timers */ -static _Thread_local int thread_timer_set; -static _Thread_local timer_t thread_timerid; - -/* - * Creates a timer for the current thread - * - * Returns 0 in case of success - */ -int afb_thread_timer_create() -{ - int rc; - struct sigevent sevp; - - if (thread_timer_set) - rc = 0; - else { - sevp.sigev_notify = SIGEV_THREAD_ID; - sevp.sigev_signo = SIGALRM; - sevp.sigev_value.sival_ptr = NULL; -#if defined(sigev_notify_thread_id) - sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid); -#else - sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid); -#endif - rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid); - thread_timer_set = !rc; - } - return 0; + if (signum != 0) + afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum); + else + callback(req); + afb_req_unref(req); } -/* - * Arms the alarm in timeout seconds for the current thread - */ -int afb_thread_timer_arm(int timeout) +void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group) { int rc; - struct itimerspec its; - - rc = afb_thread_timer_create(); - if (rc == 0) { - its.it_interval.tv_sec = 0; - its.it_interval.tv_nsec = 0; - its.it_value.tv_sec = timeout; - its.it_value.tv_nsec = 0; - rc = timer_settime(thread_timerid, 0, &its, NULL); - } - return rc; -} - -/* - * Disarms the current alarm - */ -void afb_thread_timer_disarm() -{ - if (thread_timer_set) - afb_thread_timer_arm(0); -} - -/* - * Delstroy any alarm resource for the current thread - */ -void afb_thread_timer_delete() -{ - if (thread_timer_set) { - timer_delete(thread_timerid); - thread_timer_set = 0; - } -} - -/* add the job to the list */ -static inline void job_add(struct job *job) -{ - void *group = job->group; - struct job *ijob, **pjob; - - pjob = &first_job; - ijob = first_job; - group = job->group; - if (group == NULL) - group = job; - while (ijob) { - if (ijob->group == group) - job->blocked = 1; - pjob = &ijob->next; - ijob = ijob->next; - } - *pjob = job; - job->next = NULL; - remains--; -} - -/* get the next job to process or NULL if none */ -static inline struct job *job_get() -{ - struct job *job, **pjob; - pjob = &first_job; - job = first_job; - while (job && job->blocked) { - pjob = &job->next; - job = job->next; - } - if (job) { - *pjob = job->next; - remains++; - } - return job; -} - -/* unblock a group of job */ -static inline void job_unblock(void *group) -{ - struct job *job; - - job = first_job; - while (job) { - if (job->group == group) { - job->blocked = 0; - break; - } - job = job->next; - } -} - -/* main loop of processing threads */ -static void *thread_main_loop(void *data) -{ - struct thread *me = data; - struct job *job, j; - - me->works = 0; - me->ended = 0; - afb_thread_timer_create(); - pthread_mutex_lock(&mutex); - while (!me->stop) { - /* get a job */ - job = job_get(); - if (job == NULL && first_job != NULL && running == 0) { - /* sad situation!! should not happen */ - ERROR("threads are blocked!"); - job = first_job; - first_job = job->next; - } - if (job == NULL) { - /* no job... */ - pthread_cond_wait(&cond, &mutex); - } else { - /* run the job */ - running++; - me->works = 1; - pthread_mutex_unlock(&mutex); - j = *job; - free(job); - afb_thread_timer_arm(j.timeout); - afb_sig_req(j.req, j.callback); - afb_thread_timer_disarm(); - afb_req_unref(j.req); - pthread_mutex_lock(&mutex); - if (j.group != NULL) - job_unblock(j.group); - me->works = 0; - running--; - } - - } - me->ended = 1; - pthread_mutex_unlock(&mutex); - afb_thread_timer_delete(); - return me; -} - -/* start a new thread */ -static int start_one_thread() -{ - struct thread *t; - int rc; - - assert(started < allowed); - - t = &threads[started++]; - t->stop = 0; - rc = pthread_create(&t->tid, NULL, thread_main_loop, t); - if (rc != 0) { - started--; - errno = rc; - WARNING("not able to start thread: %m"); - rc = -1; - } - return rc; -} - -/* process the 'request' with the 'callback' using a separate thread if available */ -void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group) -{ - const char *info; - struct job *job; - int rc; - - /* allocates the job */ - job = malloc(sizeof *job); - if (job == NULL) { - info = "out of memory"; - goto error; - } - - /* start a thread if needed */ - pthread_mutex_lock(&mutex); - if (remains == 0) { - info = "too many jobs"; - goto error2; - } - if (started == running && started < allowed) { - rc = start_one_thread(); - if (rc < 0 && started == 0) { - /* failed to start threading */ - info = "can't start thread"; - goto error2; - } - } - - /* fills and queues the job */ - job->callback = callback; - job->req = req; - job->timeout = timeout; - job->blocked = 0; - job->group = group; afb_req_addref(req); - job_add(job); - pthread_mutex_unlock(&mutex); - - /* signal an existing job */ - pthread_cond_signal(&cond); - return; - -error2: - pthread_mutex_unlock(&mutex); - free(job); -error: - ERROR("can't process job with threads: %s", info); - afb_req_fail(req, "internal-error", info); -} - -/* initialise the threads */ -int afb_thread_init(int allowed_count, int start_count, int waiter_count) -{ - threads = calloc(allowed_count, sizeof *threads); - if (threads == NULL) { - errno = ENOMEM; - ERROR("can't allocate threads"); - return -1; - } - - /* records the allowed count */ - allowed = allowed_count; - started = 0; - running = 0; - remains = waiter_count; - - /* start at least one thread */ - pthread_mutex_lock(&mutex); - while (started < start_count && start_one_thread() == 0); - pthread_mutex_unlock(&mutex); - - /* end */ - return -(started != start_count); -} - -/* terminate all the threads and all pending requests */ -void afb_thread_terminate() -{ - int i, n; - struct job *job; - - /* request all threads to stop */ - pthread_mutex_lock(&mutex); - allowed = 0; - n = started; - for (i = 0 ; i < n ; i++) - threads[i].stop = 1; - - /* wait until all thread are terminated */ - while (started != 0) { - /* signal threads */ - pthread_mutex_unlock(&mutex); - pthread_cond_broadcast(&cond); - pthread_mutex_lock(&mutex); - - /* join the terminated threads */ - for (i = 0 ; i < n ; i++) { - if (threads[i].tid && threads[i].ended) { - pthread_join(threads[i].tid, NULL); - threads[i].tid = 0; - started--; - } + if (0) { + /* no threading */ + sig_monitor3(timeout, req_call, (void*)req.itf, req.closure, callback); + } else { + /* threading */ + rc = jobs_queue3(group, timeout, req_call, (void*)req.itf, req.closure, callback); + if (rc < 0) { + /* TODO: allows or not to proccess it directly as when no threading? (see above) */ + ERROR("can't process job with threads: %m"); + afb_req_fail_f(req, "cancelled", "not able to pipe a job for the task"); + afb_req_unref(req); } } - pthread_mutex_unlock(&mutex); - free(threads); - - /* cancel pending jobs */ - while (first_job) { - job = first_job; - first_job = job->next; - afb_req_fail(job->req, "aborted", "termination of threading"); - afb_req_unref(job->req); - free(job); - } } + diff --git a/src/afb-thread.h b/src/afb-thread.h index 0559dfc0..4e44b55e 100644 --- a/src/afb-thread.h +++ b/src/afb-thread.h @@ -19,13 +19,5 @@ struct afb_req; -extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group); - -extern int afb_thread_init(int allowed_count, int start_count, int waiter_count); -extern void afb_thread_terminate(); - -extern int afb_thread_timer_create(); -extern int afb_thread_timer_arm(int timeout); -extern void afb_thread_timer_disarm(); -extern void afb_thread_timer_delete(); +extern void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group); diff --git a/src/jobs.c b/src/jobs.c new file mode 100644 index 00000000..c4f32244 --- /dev/null +++ b/src/jobs.c @@ -0,0 +1,344 @@ +/* + * Copyright (C) 2016, 2017 "IoT.bzh" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "jobs.h" +#include "sig-monitor.h" +#include "verbose.h" + +/* control of threads */ +struct thread +{ + pthread_t tid; /* the thread id */ + unsigned stop: 1; /* stop request */ + unsigned ended: 1; /* ended status */ + unsigned works: 1; /* is it processing a job? */ +}; + +/* describes pending job */ +struct job +{ + struct job *next; /* link to the next job enqueued */ + void *group; /* group of the request */ + void (*callback)(int,void*,void*,void*); /* processing callback */ + void *arg1; /* first arg */ + void *arg2; /* second arg */ + void *arg3; /* second arg */ + int timeout; /* timeout in second for processing the request */ + int blocked; /* is an other request blocking this one ? */ +}; + +/* synchronisation of threads */ +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + +/* queue of pending jobs */ +static struct job *first_job = NULL; + +/* count allowed, started and running threads */ +static int allowed = 0; +static int started = 0; +static int running = 0; +static int remains = 0; + +/* list of threads */ +static struct thread *threads = NULL; + +/* add the job to the list */ +static inline void job_add(struct job *job) +{ + void *group = job->group; + struct job *ijob, **pjob; + + pjob = &first_job; + ijob = first_job; + group = job->group ? : job; + while (ijob) { + if (ijob->group == group) + job->blocked = 1; + pjob = &ijob->next; + ijob = ijob->next; + } + *pjob = job; + job->next = NULL; + remains--; +} + +/* get the next job to process or NULL if none */ +static inline struct job *job_get() +{ + struct job *job, **pjob; + pjob = &first_job; + job = first_job; + while (job && job->blocked) { + pjob = &job->next; + job = job->next; + } + if (job) { + *pjob = job->next; + remains++; + } + return job; +} + +/* unblock a group of job */ +static inline void job_unblock(void *group) +{ + struct job *job; + + job = first_job; + while (job) { + if (job->group == group) { + job->blocked = 0; + break; + } + job = job->next; + } +} + +/* call the job */ +static inline void job_call(int signum, void *arg) +{ + struct job *job = arg; + job->callback(signum, job->arg1, job->arg2, job->arg3); +} + +/* cancel the job */ +static inline void job_cancel(int signum, void *arg) +{ + struct job *job = arg; + job->callback(SIGABRT, job->arg1, job->arg2, job->arg3); +} + +/* main loop of processing threads */ +static void *thread_main_loop(void *data) +{ + struct thread *me = data; + struct job *job; + + me->works = 0; + me->ended = 0; + sig_monitor_init_timeouts(); + pthread_mutex_lock(&mutex); + while (!me->stop) { + /* get a job */ + job = job_get(); + if (job == NULL && first_job != NULL && running == 0) { + /* sad situation!! should not happen */ + ERROR("threads are blocked!"); + job = first_job; + first_job = job->next; + } + if (job == NULL) { + /* no job... */ + pthread_cond_wait(&cond, &mutex); + } else { + /* run the job */ + running++; + me->works = 1; + pthread_mutex_unlock(&mutex); + sig_monitor(job->timeout, job_call, job); + pthread_mutex_lock(&mutex); + me->works = 0; + running--; + if (job->group != NULL) + job_unblock(job->group); + free(job); + } + + } + me->ended = 1; + pthread_mutex_unlock(&mutex); + sig_monitor_clean_timeouts(); + return me; +} + +/* start a new thread */ +static int start_one_thread() +{ + struct thread *t; + int rc; + + assert(started < allowed); + + t = &threads[started++]; + t->stop = 0; + rc = pthread_create(&t->tid, NULL, thread_main_loop, t); + if (rc != 0) { + started--; + errno = rc; + WARNING("not able to start thread: %m"); + rc = -1; + } + return rc; +} + +int jobs_queue( + void *group, + int timeout, + void (*callback)(int, void*), + void *arg) +{ + return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL); +} + +int jobs_queue2( + void *group, + int timeout, + void (*callback)(int, void*, void*), + void *arg1, + void *arg2) +{ + return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL); +} + +/* queue the job to the 'callback' using a separate thread if available */ +int jobs_queue3( + void *group, + int timeout, + void (*callback)(int, void*, void *, void*), + void *arg1, + void *arg2, + void *arg3) +{ + const char *info; + struct job *job; + int rc; + + /* allocates the job */ + job = malloc(sizeof *job); + if (job == NULL) { + errno = ENOMEM; + info = "out of memory"; + goto error; + } + + /* start a thread if needed */ + pthread_mutex_lock(&mutex); + if (remains == 0) { + errno = EBUSY; + info = "too many jobs"; + goto error2; + } + if (started == running && started < allowed) { + rc = start_one_thread(); + if (rc < 0 && started == 0) { + /* failed to start threading */ + info = "can't start first thread"; + goto error2; + } + } + + /* fills and queues the job */ + job->group = group; + job->timeout = timeout; + job->callback = callback; + job->arg1 = arg1; + job->arg2 = arg2; + job->arg3 = arg3; + job->blocked = 0; + job_add(job); + pthread_mutex_unlock(&mutex); + + /* signal an existing job */ + pthread_cond_signal(&cond); + return 0; + +error2: + pthread_mutex_unlock(&mutex); + free(job); +error: + ERROR("can't process job with threads: %s, %m", info); + return -1; +} + +/* initialise the threads */ +int jobs_init(int allowed_count, int start_count, int waiter_count) +{ + threads = calloc(allowed_count, sizeof *threads); + if (threads == NULL) { + errno = ENOMEM; + ERROR("can't allocate threads"); + return -1; + } + + /* records the allowed count */ + allowed = allowed_count; + started = 0; + running = 0; + remains = waiter_count; + + /* start at least one thread */ + pthread_mutex_lock(&mutex); + while (started < start_count && start_one_thread() == 0); + pthread_mutex_unlock(&mutex); + + /* end */ + return -(started != start_count); +} + +/* terminate all the threads and all pending requests */ +void jobs_terminate() +{ + int i, n; + struct job *job; + + /* request all threads to stop */ + pthread_mutex_lock(&mutex); + allowed = 0; + n = started; + for (i = 0 ; i < n ; i++) + threads[i].stop = 1; + + /* wait until all thread are terminated */ + while (started != 0) { + /* signal threads */ + pthread_mutex_unlock(&mutex); + pthread_cond_broadcast(&cond); + pthread_mutex_lock(&mutex); + + /* join the terminated threads */ + for (i = 0 ; i < n ; i++) { + if (threads[i].tid && threads[i].ended) { + pthread_join(threads[i].tid, NULL); + threads[i].tid = 0; + started--; + } + } + } + pthread_mutex_unlock(&mutex); + free(threads); + + /* cancel pending jobs */ + while (first_job) { + job = first_job; + first_job = job->next; + sig_monitor(0, job_cancel, job); + free(job); + } +} + diff --git a/src/jobs.h b/src/jobs.h new file mode 100644 index 00000000..ef72e0c1 --- /dev/null +++ b/src/jobs.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2016, 2017 "IoT.bzh" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern int jobs_queue( + void *group, + int timeout, + void (*callback)(int signum, void* arg), + void *arg); + +extern int jobs_queue2( + void *group, + int timeout, + void (*callback)(int signum, void* arg1, void *arg2), + void *arg1, + void *arg2); + +extern int jobs_queue3( + void *group, + int timeout, + void (*callback)(int signum, void* arg1, void *arg2, void *arg3), + void *arg1, + void *arg2, + void *arg3); + +extern int jobs_init(int allowed_count, int start_count, int waiter_count); +extern void jobs_terminate(); + + diff --git a/src/main.c b/src/main.c index 05805e2d..40ad19c8 100644 --- a/src/main.c +++ b/src/main.c @@ -41,8 +41,8 @@ #include "afb-hsrv.h" #include "afb-context.h" #include "afb-hreq.h" -#include "afb-sig-handler.h" -#include "afb-thread.h" +#include "sig-monitor.h" +#include "jobs.h" #include "afb-session.h" #include "verbose.h" #include "afb-common.h" @@ -439,7 +439,7 @@ int main(int argc, char *argv[]) exit(1); } - if (afb_sig_handler_init() < 0) { + if (sig_monitor_init() < 0) { ERROR("failed to initialise signal handlers"); return 1; } @@ -450,7 +450,7 @@ int main(int argc, char *argv[]) return 1; } - if (afb_thread_init(3, 1, 20) < 0) { + if (jobs_init(3, 1, 20) < 0) { ERROR("failed to initialise threading"); return 1; } diff --git a/src/sig-monitor.c b/src/sig-monitor.c new file mode 100644 index 00000000..d00f0f97 --- /dev/null +++ b/src/sig-monitor.c @@ -0,0 +1,198 @@ +/* + * Copyright (C) 2017 "IoT.bzh" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include + +#include "sig-monitor.h" +#include "verbose.h" + +/* local handler */ +static _Thread_local sigjmp_buf *error_handler; + +/* local timers */ +static _Thread_local int thread_timer_set; +static _Thread_local timer_t thread_timerid; + +/* + * Creates a timer for the current thread + * + * Returns 0 in case of success + */ +static inline int timeout_create() +{ + int rc; + struct sigevent sevp; + + if (thread_timer_set) + rc = 0; + else { + sevp.sigev_notify = SIGEV_THREAD_ID; + sevp.sigev_signo = SIGALRM; + sevp.sigev_value.sival_ptr = NULL; +#if defined(sigev_notify_thread_id) + sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid); +#else + sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid); +#endif + rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid); + thread_timer_set = !rc; + } + return 0; +} + +/* + * Arms the alarm in timeout seconds for the current thread + */ +static inline int timeout_arm(int timeout) +{ + int rc; + struct itimerspec its; + + rc = timeout_create(); + if (rc == 0) { + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + its.it_value.tv_sec = timeout; + its.it_value.tv_nsec = 0; + rc = timer_settime(thread_timerid, 0, &its, NULL); + } + + return rc; +} + +/* + * Disarms the current alarm + */ +static inline void timeout_disarm() +{ + if (thread_timer_set) + timeout_arm(0); +} + +/* + * Destroy any alarm resource for the current thread + */ +static inline void timeout_delete() +{ + if (thread_timer_set) { + timer_delete(thread_timerid); + thread_timer_set = 0; + } +} + + +/* Handles signals that terminate the process */ +static void on_signal_terminate (int signum) +{ + ERROR("Terminating signal %d received: %s", signum, strsignal(signum)); + exit(1); +} + +/* Handles monitored signals that can be continued */ +static void on_signal_error(int signum) +{ + sigset_t sigset; + + // unlock signal to allow a new signal to come + if (error_handler != NULL) { + sigemptyset(&sigset); + sigaddset(&sigset, signum); + sigprocmask(SIG_UNBLOCK, &sigset, 0); + longjmp(*error_handler, signum); + } + if (signum == SIGALRM) + return; + ERROR("Unmonitored signal %d received: %s", signum, strsignal(signum)); + exit(2); +} + +/* install the handlers */ +static int install(void (*handler)(int), int *signals) +{ + int result = 1; + while(*signals > 0) { + if (signal(*signals, handler) == SIG_ERR) { + ERROR("failed to install signal handler for signal %s", strsignal(*signals)); + result = 0; + } + signals++; + } + return result; +} + +int sig_monitor_init() +{ + static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 }; + static int sigterm[] = { SIGINT, SIGABRT, 0 }; + + return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1; +} + +int sig_monitor_init_timeouts() +{ + return timeout_create(); +} + +void sig_monitor_clean_timeouts() +{ + timeout_delete(); +} + +void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg) +{ + sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg, NULL, NULL); +} + +void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2) +{ + sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg1, arg2, NULL); +} + +void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3) +{ + volatile int signum, signum2; + sigjmp_buf jmpbuf, *older; + + older = error_handler; + signum = setjmp(jmpbuf); + if (signum == 0) { + error_handler = &jmpbuf; + if (timeout) + timeout_arm(timeout); + function(0, arg1, arg2, arg3); + } else { + signum2 = setjmp(jmpbuf); + if (signum2 == 0) + function(signum, arg1, arg2, arg3); + } + error_handler = older; + if (timeout) + timeout_disarm(); +} + + + + + diff --git a/src/afb-sig-handler.h b/src/sig-monitor.h similarity index 57% rename from src/afb-sig-handler.h rename to src/sig-monitor.h index 4f41324d..a3a28bc1 100644 --- a/src/afb-sig-handler.h +++ b/src/sig-monitor.h @@ -1,6 +1,5 @@ /* - * Copyright (C) 2015, 2016, 2017 "IoT.bzh" - * Author "Fulup Ar Foll" + * Copyright (C) 2017 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,11 +17,11 @@ #pragma once -struct afb_req; +extern int sig_monitor_init(); +extern void sig_monitor_clean_timeouts(); +extern int sig_monitor_init_timeouts(); -extern int afb_sig_handler_init(); - -extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout); -extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req)); -extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout); +extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg); +extern void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2); +extern void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3); -- 2.16.6