Refactor of threading and signal monitor
authorJosé Bollo <jose.bollo@iot.bzh>
Wed, 22 Mar 2017 15:49:53 +0000 (16:49 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Wed, 22 Mar 2017 15:49:53 +0000 (16:49 +0100)
The goal is to allow use of this facilities for
things that are not 'afb_req'.

Change-Id: I0d99c227934ed45136477bf6235bd1541d5f05cf
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/CMakeLists.txt
src/afb-api-so.c
src/afb-sig-handler.c [deleted file]
src/afb-thread.c
src/afb-thread.h
src/jobs.c [new file with mode: 0644]
src/jobs.h [new file with mode: 0644]
src/main.c
src/sig-monitor.c [new file with mode: 0644]
src/sig-monitor.h [moved from src/afb-sig-handler.h with 57% similarity]

index 8d0121a..6cbc6fa 100644 (file)
@@ -69,7 +69,6 @@ ADD_LIBRARY(afb-lib STATIC
        afb-method.c
        afb-msg-json.c
        afb-session.c
-       afb-sig-handler.c
        afb-svc.c
        afb-subcall.c
        afb-thread.c
@@ -78,10 +77,12 @@ ADD_LIBRARY(afb-lib STATIC
        afb-ws-json1.c
        afb-ws.c
        afb-wsj1.c
+       jobs.c
        locale-root.c
+       sd-fds.c
+       sig-monitor.c
        verbose.c
        websock.c
-       sd-fds.c
 )
 
 ###########################################
index 74f94f3..222fbbb 100644 (file)
@@ -38,7 +38,6 @@
 #include "afb-context.h"
 #include "afb-apis.h"
 #include "afb-api-so.h"
-#include "afb-sig-handler.h"
 #include "afb-thread.h"
 #include "afb-evt.h"
 #include "afb-svc.h"
@@ -194,12 +193,7 @@ static void call_cb(void *closure, struct afb_req req, struct afb_context *conte
        if (!verb->name)
                afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix);
        else if (call_check(req, context, verb)) {
-               if (0)
-                       /* not threaded */
-                       afb_sig_req_timeout(req, verb->callback, api_timeout);
-               else
-                       /* threaded */
-                       afb_thread_call(req, verb->callback, api_timeout, desc);
+               afb_thread_req_call(req, verb->callback, api_timeout, desc);
        }
 }
 
diff --git a/src/afb-sig-handler.c b/src/afb-sig-handler.c
deleted file mode 100644 (file)
index bbf1531..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
- * Author "Fulup Ar Foll"
- * Author José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-
-#include <stdlib.h>
-#include <signal.h>
-#include <string.h>
-#include <setjmp.h>
-
-#include <afb/afb-req-itf.h>
-
-#include "afb-sig-handler.h"
-#include "afb-thread.h"
-#include "verbose.h"
-
-static _Thread_local sigjmp_buf *error_handler;
-
-static void on_signal_terminate (int signum)
-{
-       ERROR("Terminating signal received %s", strsignal(signum));
-       exit(1);
-}
-
-static void on_signal_error(int signum)
-{
-       sigset_t sigset;
-
-       // unlock signal to allow a new signal to come
-       if (error_handler != NULL) {
-               sigemptyset(&sigset);
-               sigaddset(&sigset, signum);
-               sigprocmask(SIG_UNBLOCK, &sigset, 0);
-               longjmp(*error_handler, signum);
-       }
-       if (signum == SIGALRM)
-               return;
-       ERROR("Unmonitored signal received %s", strsignal(signum));
-       exit(2);
-}
-
-static int install(void (*handler)(int), int *signals)
-{
-       int result = 1;
-       while(*signals > 0) {
-               if (signal(*signals, handler) == SIG_ERR) {
-                       ERROR("failed to install signal handler for signal %s", strsignal(*signals));
-                       result = 0;
-               }
-               signals++;
-       }
-       return result;
-}
-
-int afb_sig_handler_init()
-{
-       static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 };
-       static int sigterm[] = { SIGINT, SIGABRT, 0 };
-
-       return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
-}
-
-int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req))
-{
-       volatile int signum;
-       sigjmp_buf jmpbuf, *older;
-
-       older = error_handler;
-       signum = setjmp(jmpbuf);
-       if (signum != 0)
-               afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
-       else {
-               error_handler = &jmpbuf;
-               callback(req);
-       }
-       error_handler = older;
-       return signum;
-}
-
-int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout)
-{
-       int rc;
-
-       if (timeout)
-               afb_thread_timer_arm(timeout);
-       rc = afb_sig_req(req, callback);
-       afb_thread_timer_disarm();
-       return rc;
-}
-
-void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout)
-{
-       volatile int signum;
-       sigjmp_buf jmpbuf, *older;
-
-       older = error_handler;
-       signum = setjmp(jmpbuf);
-       if (signum != 0) {
-               function(signum, closure);
-       }
-       else {
-               error_handler = &jmpbuf;
-               if (timeout)
-                       afb_thread_timer_arm(timeout);
-               function(0, closure);
-       }
-       afb_thread_timer_disarm();
-       error_handler = older;
-}
-
index 790b86b..67870ce 100644 (file)
 
 #define _GNU_SOURCE
 
-#include <stdlib.h>
-#include <unistd.h>
-#include <signal.h>
-#include <time.h>
-#include <sys/syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <assert.h>
+#include <string.h>
 
 #include <afb/afb-req-itf.h>
 
 #include "afb-thread.h"
-#include "afb-sig-handler.h"
+#include "jobs.h"
+#include "sig-monitor.h"
 #include "verbose.h"
 
-/* control of threads */
-struct thread
+static void req_call(int signum, void *arg1, void *arg2, void *arg3)
 {
-       pthread_t tid;     /* the thread id */
-       unsigned stop: 1;  /* stop request */
-       unsigned ended: 1; /* ended status */
-       unsigned works: 1; /* is it processing a job? */
-};
+       struct afb_req req = { .itf = arg1, .closure = arg2 };
+       void (*callback)(struct afb_req) = arg3;
 
-/* describes pending job */
-struct job
-{
-       void (*callback)(struct afb_req req); /* processing callback */
-       struct afb_req req; /* request to be processed */
-       int timeout;        /* timeout in second for processing the request */
-       int blocked;        /* is an other request blocking this one ? */
-       void *group;        /* group of the request */
-       struct job *next;   /* link to the next job enqueued */
-};
-
-/* synchronisation of threads */
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
-
-/* queue of pending jobs */
-static struct job *first_job = NULL;
-
-/* count allowed, started and running threads */
-static int allowed = 0;
-static int started = 0;
-static int running = 0;
-static int remains = 0;
-
-/* list of threads */
-static struct thread *threads = NULL;
-
-/* local timers */
-static _Thread_local int thread_timer_set;
-static _Thread_local timer_t thread_timerid;
-
-/*
- * Creates a timer for the current thread
- *
- * Returns 0 in case of success
- */
-int afb_thread_timer_create()
-{
-       int rc;
-       struct sigevent sevp;
-
-       if (thread_timer_set)
-               rc = 0;
-       else {
-               sevp.sigev_notify = SIGEV_THREAD_ID;
-               sevp.sigev_signo = SIGALRM;
-               sevp.sigev_value.sival_ptr = NULL;
-#if defined(sigev_notify_thread_id)
-               sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
-#else
-               sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
-#endif
-               rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
-               thread_timer_set = !rc;
-       }
-       return 0;
+       if (signum != 0)
+               afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
+       else
+               callback(req);
+       afb_req_unref(req);
 }
 
-/*
- * Arms the alarm in timeout seconds for the current thread
- */
-int afb_thread_timer_arm(int timeout)
+void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
 {
        int rc;
-       struct itimerspec its;
-
-       rc = afb_thread_timer_create();
-       if (rc == 0) {
-               its.it_interval.tv_sec = 0;
-               its.it_interval.tv_nsec = 0;
-               its.it_value.tv_sec = timeout;
-               its.it_value.tv_nsec = 0;
-               rc = timer_settime(thread_timerid, 0, &its, NULL);
-       }
 
-       return rc;
-}
-
-/*
- * Disarms the current alarm
- */
-void afb_thread_timer_disarm()
-{
-       if (thread_timer_set)
-               afb_thread_timer_arm(0);
-}
-
-/*
- * Delstroy any alarm resource for the current thread
- */
-void afb_thread_timer_delete()
-{
-       if (thread_timer_set) {
-               timer_delete(thread_timerid);
-               thread_timer_set = 0;
-       }
-}
-
-/* add the job to the list */
-static inline void job_add(struct job *job)
-{
-       void *group = job->group;
-       struct job *ijob, **pjob;
-
-       pjob = &first_job;
-       ijob = first_job;
-       group = job->group;
-       if (group == NULL)
-               group = job;
-       while (ijob) {
-               if (ijob->group == group)
-                       job->blocked = 1;
-               pjob = &ijob->next;
-               ijob = ijob->next;
-       }
-       *pjob = job;
-       job->next = NULL;
-       remains--;
-}
-
-/* get the next job to process or NULL if none */
-static inline struct job *job_get()
-{
-       struct job *job, **pjob;
-       pjob = &first_job;
-       job = first_job;
-       while (job && job->blocked) {
-               pjob = &job->next;
-               job = job->next;
-       }
-       if (job) {
-               *pjob = job->next;
-               remains++;
-       }
-       return job;
-}
-
-/* unblock a group of job */
-static inline void job_unblock(void *group)
-{
-       struct job *job;
-
-       job = first_job;
-       while (job) {
-               if (job->group == group) {
-                       job->blocked = 0;
-                       break;
-               }
-               job = job->next;
-       }
-}
-
-/* main loop of processing threads */
-static void *thread_main_loop(void *data)
-{
-       struct thread *me = data;
-       struct job *job, j;
-
-       me->works = 0;
-       me->ended = 0;
-       afb_thread_timer_create();
-       pthread_mutex_lock(&mutex);
-       while (!me->stop) {
-               /* get a job */
-               job = job_get();
-               if (job == NULL && first_job != NULL && running == 0) {
-                       /* sad situation!! should not happen */
-                       ERROR("threads are blocked!");
-                       job = first_job;
-                       first_job = job->next;
-               }
-               if (job == NULL) {
-                       /* no job... */
-                       pthread_cond_wait(&cond, &mutex);
-               } else {
-                       /* run the job */
-                       running++;
-                       me->works = 1;
-                       pthread_mutex_unlock(&mutex);
-                       j = *job;
-                       free(job);
-                       afb_thread_timer_arm(j.timeout);
-                       afb_sig_req(j.req, j.callback);
-                       afb_thread_timer_disarm();
-                       afb_req_unref(j.req);
-                       pthread_mutex_lock(&mutex);
-                       if (j.group != NULL)
-                               job_unblock(j.group);
-                       me->works = 0;
-                       running--;
-               }
-
-       }
-       me->ended = 1;
-       pthread_mutex_unlock(&mutex);
-       afb_thread_timer_delete();
-       return me;
-}
-
-/* start a new thread */
-static int start_one_thread()
-{
-       struct thread *t;
-       int rc;
-
-       assert(started < allowed);
-
-       t = &threads[started++];
-       t->stop = 0;
-       rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
-       if (rc != 0) {
-               started--;
-               errno = rc;
-               WARNING("not able to start thread: %m");
-               rc = -1;
-       }
-       return rc;
-}
-
-/* process the 'request' with the 'callback' using a separate thread if available */
-void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
-{
-       const char *info;
-       struct job *job;
-       int rc;
-
-       /* allocates the job */
-       job = malloc(sizeof *job);
-       if (job == NULL) {
-               info = "out of memory";
-               goto error;
-       }
-
-       /* start a thread if needed */
-       pthread_mutex_lock(&mutex);
-       if (remains == 0) {
-               info = "too many jobs";
-               goto error2;
-       }
-       if (started == running && started < allowed) {
-               rc = start_one_thread();
-               if (rc < 0 && started == 0) {
-                       /* failed to start threading */
-                       info = "can't start thread";
-                       goto error2;
-               }
-       }
-
-       /* fills and queues the job */
-       job->callback = callback;
-       job->req = req;
-       job->timeout = timeout;
-       job->blocked = 0;
-       job->group = group;
        afb_req_addref(req);
-       job_add(job);
-       pthread_mutex_unlock(&mutex);
-
-       /* signal an existing job */
-       pthread_cond_signal(&cond);
-       return;
-
-error2:
-       pthread_mutex_unlock(&mutex);
-       free(job);
-error:
-       ERROR("can't process job with threads: %s", info);
-       afb_req_fail(req, "internal-error", info);
-}
-
-/* initialise the threads */
-int afb_thread_init(int allowed_count, int start_count, int waiter_count)
-{
-       threads = calloc(allowed_count, sizeof *threads);
-       if (threads == NULL) {
-               errno = ENOMEM;
-               ERROR("can't allocate threads");
-               return -1;
-       }
-
-       /* records the allowed count */
-       allowed = allowed_count;
-       started = 0;
-       running = 0;
-       remains = waiter_count;
-
-       /* start at least one thread */
-       pthread_mutex_lock(&mutex);
-       while (started < start_count && start_one_thread() == 0);
-       pthread_mutex_unlock(&mutex);
-
-       /* end */
-       return -(started != start_count);
-}
-
-/* terminate all the threads and all pending requests */
-void afb_thread_terminate()
-{
-       int i, n;
-       struct job *job;
-
-       /* request all threads to stop */
-       pthread_mutex_lock(&mutex);
-       allowed = 0;
-       n = started;
-       for (i = 0 ; i < n ; i++)
-               threads[i].stop = 1;
-
-       /* wait until all thread are terminated */
-       while (started != 0) {
-               /* signal threads */
-               pthread_mutex_unlock(&mutex);
-               pthread_cond_broadcast(&cond);
-               pthread_mutex_lock(&mutex);
-
-               /* join the terminated threads */
-               for (i = 0 ; i < n ; i++) {
-                       if (threads[i].tid && threads[i].ended) {
-                               pthread_join(threads[i].tid, NULL);
-                               threads[i].tid = 0;
-                               started--;
-                       }
+       if (0) {
+               /* no threading */
+               sig_monitor3(timeout, req_call, (void*)req.itf, req.closure, callback);
+       } else {
+               /* threading */
+               rc = jobs_queue3(group, timeout, req_call, (void*)req.itf, req.closure, callback);
+               if (rc < 0) {
+                       /* TODO: allows or not to proccess it directly as when no threading? (see above) */
+                       ERROR("can't process job with threads: %m");
+                       afb_req_fail_f(req, "cancelled", "not able to pipe a job for the task");
+                       afb_req_unref(req);
                }
        }
-       pthread_mutex_unlock(&mutex);
-       free(threads);
-
-       /* cancel pending jobs */
-       while (first_job) {
-               job = first_job;
-               first_job = job->next;
-               afb_req_fail(job->req, "aborted", "termination of threading");
-               afb_req_unref(job->req);
-               free(job);
-       }
 }
+
index 0559dfc..4e44b55 100644 (file)
 
 struct afb_req;
 
-extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
-
-extern int afb_thread_init(int allowed_count, int start_count, int waiter_count);
-extern void afb_thread_terminate();
-
-extern int afb_thread_timer_create();
-extern int afb_thread_timer_arm(int timeout);
-extern void afb_thread_timer_disarm();
-extern void afb_thread_timer_delete();
+extern void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
 
diff --git a/src/jobs.c b/src/jobs.c
new file mode 100644 (file)
index 0000000..c4f3224
--- /dev/null
@@ -0,0 +1,344 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "jobs.h"
+#include "sig-monitor.h"
+#include "verbose.h"
+
+/* control of threads */
+struct thread
+{
+       pthread_t tid;     /* the thread id */
+       unsigned stop: 1;  /* stop request */
+       unsigned ended: 1; /* ended status */
+       unsigned works: 1; /* is it processing a job? */
+};
+
+/* describes pending job */
+struct job
+{
+       struct job *next;   /* link to the next job enqueued */
+       void *group;        /* group of the request */
+       void (*callback)(int,void*,void*,void*);     /* processing callback */
+       void *arg1;         /* first arg */
+       void *arg2;         /* second arg */
+       void *arg3;         /* second arg */
+       int timeout;        /* timeout in second for processing the request */
+       int blocked;        /* is an other request blocking this one ? */
+};
+
+/* synchronisation of threads */
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
+
+/* queue of pending jobs */
+static struct job *first_job = NULL;
+
+/* count allowed, started and running threads */
+static int allowed = 0;
+static int started = 0;
+static int running = 0;
+static int remains = 0;
+
+/* list of threads */
+static struct thread *threads = NULL;
+
+/* add the job to the list */
+static inline void job_add(struct job *job)
+{
+       void *group = job->group;
+       struct job *ijob, **pjob;
+
+       pjob = &first_job;
+       ijob = first_job;
+       group = job->group ? : job;
+       while (ijob) {
+               if (ijob->group == group)
+                       job->blocked = 1;
+               pjob = &ijob->next;
+               ijob = ijob->next;
+       }
+       *pjob = job;
+       job->next = NULL;
+       remains--;
+}
+
+/* get the next job to process or NULL if none */
+static inline struct job *job_get()
+{
+       struct job *job, **pjob;
+       pjob = &first_job;
+       job = first_job;
+       while (job && job->blocked) {
+               pjob = &job->next;
+               job = job->next;
+       }
+       if (job) {
+               *pjob = job->next;
+               remains++;
+       }
+       return job;
+}
+
+/* unblock a group of job */
+static inline void job_unblock(void *group)
+{
+       struct job *job;
+
+       job = first_job;
+       while (job) {
+               if (job->group == group) {
+                       job->blocked = 0;
+                       break;
+               }
+               job = job->next;
+       }
+}
+
+/* call the job */
+static inline void job_call(int signum, void *arg)
+{
+       struct job *job = arg;
+       job->callback(signum, job->arg1, job->arg2, job->arg3);
+}
+
+/* cancel the job */
+static inline void job_cancel(int signum, void *arg)
+{
+       struct job *job = arg;
+       job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
+}
+
+/* main loop of processing threads */
+static void *thread_main_loop(void *data)
+{
+       struct thread *me = data;
+       struct job *job;
+
+       me->works = 0;
+       me->ended = 0;
+       sig_monitor_init_timeouts();
+       pthread_mutex_lock(&mutex);
+       while (!me->stop) {
+               /* get a job */
+               job = job_get();
+               if (job == NULL && first_job != NULL && running == 0) {
+                       /* sad situation!! should not happen */
+                       ERROR("threads are blocked!");
+                       job = first_job;
+                       first_job = job->next;
+               }
+               if (job == NULL) {
+                       /* no job... */
+                       pthread_cond_wait(&cond, &mutex);
+               } else {
+                       /* run the job */
+                       running++;
+                       me->works = 1;
+                       pthread_mutex_unlock(&mutex);
+                       sig_monitor(job->timeout, job_call, job);
+                       pthread_mutex_lock(&mutex);
+                       me->works = 0;
+                       running--;
+                       if (job->group != NULL)
+                               job_unblock(job->group);
+                       free(job);
+               }
+
+       }
+       me->ended = 1;
+       pthread_mutex_unlock(&mutex);
+       sig_monitor_clean_timeouts();
+       return me;
+}
+
+/* start a new thread */
+static int start_one_thread()
+{
+       struct thread *t;
+       int rc;
+
+       assert(started < allowed);
+
+       t = &threads[started++];
+       t->stop = 0;
+       rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+       if (rc != 0) {
+               started--;
+               errno = rc;
+               WARNING("not able to start thread: %m");
+               rc = -1;
+       }
+       return rc;
+}
+
+int jobs_queue(
+               void *group,
+               int timeout,
+               void (*callback)(int, void*),
+               void *arg)
+{
+       return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
+}
+
+int jobs_queue2(
+               void *group,
+               int timeout,
+               void (*callback)(int, void*, void*),
+               void *arg1,
+               void *arg2)
+{
+       return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
+}
+
+/* queue the job to the 'callback' using a separate thread if available */
+int jobs_queue3(
+               void *group,
+               int timeout,
+               void (*callback)(int, void*, void *, void*),
+               void *arg1,
+               void *arg2,
+               void *arg3)
+{
+       const char *info;
+       struct job *job;
+       int rc;
+
+       /* allocates the job */
+       job = malloc(sizeof *job);
+       if (job == NULL) {
+               errno = ENOMEM;
+               info = "out of memory";
+               goto error;
+       }
+
+       /* start a thread if needed */
+       pthread_mutex_lock(&mutex);
+       if (remains == 0) {
+               errno = EBUSY;
+               info = "too many jobs";
+               goto error2;
+       }
+       if (started == running && started < allowed) {
+               rc = start_one_thread();
+               if (rc < 0 && started == 0) {
+                       /* failed to start threading */
+                       info = "can't start first thread";
+                       goto error2;
+               }
+       }
+
+       /* fills and queues the job */
+       job->group = group;
+       job->timeout = timeout;
+       job->callback = callback;
+       job->arg1 = arg1;
+       job->arg2 = arg2;
+       job->arg3 = arg3;
+       job->blocked = 0;
+       job_add(job);
+       pthread_mutex_unlock(&mutex);
+
+       /* signal an existing job */
+       pthread_cond_signal(&cond);
+       return 0;
+
+error2:
+       pthread_mutex_unlock(&mutex);
+       free(job);
+error:
+       ERROR("can't process job with threads: %s, %m", info);
+       return -1;
+}
+
+/* initialise the threads */
+int jobs_init(int allowed_count, int start_count, int waiter_count)
+{
+       threads = calloc(allowed_count, sizeof *threads);
+       if (threads == NULL) {
+               errno = ENOMEM;
+               ERROR("can't allocate threads");
+               return -1;
+       }
+
+       /* records the allowed count */
+       allowed = allowed_count;
+       started = 0;
+       running = 0;
+       remains = waiter_count;
+
+       /* start at least one thread */
+       pthread_mutex_lock(&mutex);
+       while (started < start_count && start_one_thread() == 0);
+       pthread_mutex_unlock(&mutex);
+
+       /* end */
+       return -(started != start_count);
+}
+
+/* terminate all the threads and all pending requests */
+void jobs_terminate()
+{
+       int i, n;
+       struct job *job;
+
+       /* request all threads to stop */
+       pthread_mutex_lock(&mutex);
+       allowed = 0;
+       n = started;
+       for (i = 0 ; i < n ; i++)
+               threads[i].stop = 1;
+
+       /* wait until all thread are terminated */
+       while (started != 0) {
+               /* signal threads */
+               pthread_mutex_unlock(&mutex);
+               pthread_cond_broadcast(&cond);
+               pthread_mutex_lock(&mutex);
+
+               /* join the terminated threads */
+               for (i = 0 ; i < n ; i++) {
+                       if (threads[i].tid && threads[i].ended) {
+                               pthread_join(threads[i].tid, NULL);
+                               threads[i].tid = 0;
+                               started--;
+                       }
+               }
+       }
+       pthread_mutex_unlock(&mutex);
+       free(threads);
+
+       /* cancel pending jobs */
+       while (first_job) {
+               job = first_job;
+               first_job = job->next;
+               sig_monitor(0, job_cancel, job);
+               free(job);
+       }
+}
+
diff --git a/src/jobs.h b/src/jobs.h
new file mode 100644 (file)
index 0000000..ef72e0c
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern int jobs_queue(
+               void *group,
+               int timeout,
+               void (*callback)(int signum, void* arg),
+               void *arg);
+
+extern int jobs_queue2(
+               void *group,
+               int timeout,
+               void (*callback)(int signum, void* arg1, void *arg2),
+               void *arg1,
+               void *arg2);
+
+extern int jobs_queue3(
+               void *group,
+               int timeout,
+               void (*callback)(int signum, void* arg1, void *arg2, void *arg3),
+               void *arg1,
+               void *arg2,
+               void *arg3);
+
+extern int jobs_init(int allowed_count, int start_count, int waiter_count);
+extern void jobs_terminate();
+
+
index 05805e2..40ad19c 100644 (file)
@@ -41,8 +41,8 @@
 #include "afb-hsrv.h"
 #include "afb-context.h"
 #include "afb-hreq.h"
-#include "afb-sig-handler.h"
-#include "afb-thread.h"
+#include "sig-monitor.h"
+#include "jobs.h"
 #include "afb-session.h"
 #include "verbose.h"
 #include "afb-common.h"
@@ -439,7 +439,7 @@ int main(int argc, char *argv[])
                exit(1);
        }
 
-       if (afb_sig_handler_init() < 0) {
+       if (sig_monitor_init() < 0) {
                ERROR("failed to initialise signal handlers");
                return 1;
        }
@@ -450,7 +450,7 @@ int main(int argc, char *argv[])
                return 1;
        }
 
-       if (afb_thread_init(3, 1, 20) < 0) {
+       if (jobs_init(3, 1, 20) < 0) {
                ERROR("failed to initialise threading");
                return 1;
        }
diff --git a/src/sig-monitor.c b/src/sig-monitor.c
new file mode 100644 (file)
index 0000000..d00f0f9
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <signal.h>
+#include <string.h>
+#include <setjmp.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#include "sig-monitor.h"
+#include "verbose.h"
+
+/* local handler */
+static _Thread_local sigjmp_buf *error_handler;
+
+/* local timers */
+static _Thread_local int thread_timer_set;
+static _Thread_local timer_t thread_timerid;
+
+/*
+ * Creates a timer for the current thread
+ *
+ * Returns 0 in case of success
+ */
+static inline int timeout_create()
+{
+       int rc;
+       struct sigevent sevp;
+
+       if (thread_timer_set)
+               rc = 0;
+       else {
+               sevp.sigev_notify = SIGEV_THREAD_ID;
+               sevp.sigev_signo = SIGALRM;
+               sevp.sigev_value.sival_ptr = NULL;
+#if defined(sigev_notify_thread_id)
+               sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
+#else
+               sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
+#endif
+               rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
+               thread_timer_set = !rc;
+       }
+       return 0;
+}
+
+/*
+ * Arms the alarm in timeout seconds for the current thread
+ */
+static inline int timeout_arm(int timeout)
+{
+       int rc;
+       struct itimerspec its;
+
+       rc = timeout_create();
+       if (rc == 0) {
+               its.it_interval.tv_sec = 0;
+               its.it_interval.tv_nsec = 0;
+               its.it_value.tv_sec = timeout;
+               its.it_value.tv_nsec = 0;
+               rc = timer_settime(thread_timerid, 0, &its, NULL);
+       }
+
+       return rc;
+}
+
+/*
+ * Disarms the current alarm
+ */
+static inline void timeout_disarm()
+{
+       if (thread_timer_set)
+               timeout_arm(0);
+}
+
+/*
+ * Destroy any alarm resource for the current thread
+ */
+static inline void timeout_delete()
+{
+       if (thread_timer_set) {
+               timer_delete(thread_timerid);
+               thread_timer_set = 0;
+       }
+}
+
+
+/* Handles signals that terminate the process */
+static void on_signal_terminate (int signum)
+{
+       ERROR("Terminating signal %d received: %s", signum, strsignal(signum));
+       exit(1);
+}
+
+/* Handles monitored signals that can be continued */
+static void on_signal_error(int signum)
+{
+       sigset_t sigset;
+
+       // unlock signal to allow a new signal to come
+       if (error_handler != NULL) {
+               sigemptyset(&sigset);
+               sigaddset(&sigset, signum);
+               sigprocmask(SIG_UNBLOCK, &sigset, 0);
+               longjmp(*error_handler, signum);
+       }
+       if (signum == SIGALRM)
+               return;
+       ERROR("Unmonitored signal %d received: %s", signum, strsignal(signum));
+       exit(2);
+}
+
+/* install the handlers */
+static int install(void (*handler)(int), int *signals)
+{
+       int result = 1;
+       while(*signals > 0) {
+               if (signal(*signals, handler) == SIG_ERR) {
+                       ERROR("failed to install signal handler for signal %s", strsignal(*signals));
+                       result = 0;
+               }
+               signals++;
+       }
+       return result;
+}
+
+int sig_monitor_init()
+{
+       static int sigerr[] = { SIGALRM, SIGSEGV, SIGFPE, 0 };
+       static int sigterm[] = { SIGINT, SIGABRT, 0 };
+
+       return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
+}
+
+int sig_monitor_init_timeouts()
+{
+       return timeout_create();
+}
+
+void sig_monitor_clean_timeouts()
+{
+       timeout_delete();
+}
+
+void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg)
+{
+       sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg, NULL, NULL);
+}
+
+void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2)
+{
+       sig_monitor3(timeout, (void (*)(int,void*,void*,void*))function, arg1, arg2, NULL);
+}
+
+void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3)
+{
+       volatile int signum, signum2;
+       sigjmp_buf jmpbuf, *older;
+
+       older = error_handler;
+       signum = setjmp(jmpbuf);
+       if (signum == 0) {
+               error_handler = &jmpbuf;
+               if (timeout)
+                       timeout_arm(timeout);
+               function(0, arg1, arg2, arg3);
+       } else {
+               signum2 = setjmp(jmpbuf);
+               if (signum2 == 0)
+                       function(signum, arg1, arg2, arg3);
+       }
+       error_handler = older;
+       if (timeout)
+               timeout_disarm();
+}
+
+
+
+
+
similarity index 57%
rename from src/afb-sig-handler.h
rename to src/sig-monitor.h
index 4f41324..a3a28bc 100644 (file)
@@ -1,6 +1,5 @@
 /*
- * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
- * Author "Fulup Ar Foll"
+ * Copyright (C) 2017 "IoT.bzh"
  * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 #pragma once
 
-struct afb_req;
+extern int sig_monitor_init();
+extern void sig_monitor_clean_timeouts();
+extern int sig_monitor_init_timeouts();
 
-extern int afb_sig_handler_init();
-
-extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout);
-extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req));
-extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout);
+extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg);
+extern void sig_monitor2(int timeout, void (*function)(int sig, void*, void*), void *arg1, void *arg2);
+extern void sig_monitor3(int timeout, void (*function)(int sig, void*, void*, void*), void *arg1, void *arg2, void *arg3);