Threads: handles request with threads
authorJosé Bollo <jose.bollo@iot.bzh>
Wed, 3 Aug 2016 18:04:08 +0000 (20:04 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Mon, 10 Oct 2016 15:33:32 +0000 (17:33 +0200)
This implementation handles all requests with threads.
Later implementation could add a mechanism to choose
what request will be handled by threads.

Each API receive its requests in serial order without
reentrancy. Here again, this can change in the future
if a choice is possible to allow reentrant calls.

The signal/event are not processed using threads
in this version. It may change in the future.

Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/CMakeLists.txt
src/afb-api-so.c
src/afb-sig-handler.c
src/afb-sig-handler.h
src/afb-thread.c [new file with mode: 0644]
src/afb-thread.h [new file with mode: 0644]
src/main.c
src/tests/test-thread.c [new file with mode: 0644]
src/tests/test-thread.sh [new file with mode: 0755]

index 9c13cc1..94e0a93 100644 (file)
@@ -48,6 +48,7 @@ ADD_LIBRARY(afb-lib STATIC
        afb-sig-handler.c
        afb-svc.c
        afb-subcall.c
+       afb-thread.c
        afb-websock.c
        afb-ws-client.c
        afb-ws-json1.c
index ef1bbcc..4bb6087 100644 (file)
@@ -38,6 +38,7 @@
 #include "afb-apis.h"
 #include "afb-api-so.h"
 #include "afb-sig-handler.h"
+#include "afb-thread.h"
 #include "afb-evt.h"
 #include "afb-svc.h"
 #include "verbose.h"
@@ -139,33 +140,22 @@ static int afb_api_so_rootdir_open_locale(void *closure, const char *filename, i
        return afb_common_rootdir_open_locale(filename, flags, locale);
 }
 
-static void monitored_call(int signum, void *arg)
+static int call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb)
 {
-       struct monitoring *data = arg;
-       if (signum != 0)
-               afb_req_fail_f(data->req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
-       else
-               data->action(data->req);
-}
-
-static void call_check(struct afb_req req, struct afb_context *context, const struct afb_verb_desc_v1 *verb)
-{
-       struct monitoring data;
-
        int stag = (int)verb->session;
 
        if ((stag & (AFB_SESSION_CREATE|AFB_SESSION_CLOSE|AFB_SESSION_RENEW|AFB_SESSION_CHECK|AFB_SESSION_LOA_EQ)) != 0) {
                if (!afb_context_check(context)) {
                        afb_context_close(context);
                        afb_req_fail(req, "failed", "invalid token's identity");
-                       return;
+                       return 0;
                }
        }
 
        if ((stag & AFB_SESSION_CREATE) != 0) {
                if (afb_context_check_loa(context, 1)) {
                        afb_req_fail(req, "failed", "invalid creation state");
-                       return;
+                       return 0;
                }
                afb_context_change_loa(context, 1);
                afb_context_refresh(context);
@@ -183,7 +173,7 @@ static void call_check(struct afb_req req, struct afb_context *context, const st
                int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
                if (!afb_context_check_loa(context, loa)) {
                        afb_req_fail(req, "failed", "invalid LOA");
-                       return;
+                       return 0;
                }
        }
 
@@ -191,27 +181,30 @@ static void call_check(struct afb_req req, struct afb_context *context, const st
                int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
                if (afb_context_check_loa(context, loa + 1)) {
                        afb_req_fail(req, "failed", "invalid LOA");
-                       return;
+                       return 0;
                }
        }
-
-       data.req = req;
-       data.action = verb->callback;
-       afb_sig_monitor(monitored_call, &data, api_timeout);
+       return 1;
 }
 
-static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *verb, size_t lenverb)
+static void call_cb(void *closure, struct afb_req req, struct afb_context *context, const char *strverb, size_t lenverb)
 {
-       const struct afb_verb_desc_v1 *v;
+       const struct afb_verb_desc_v1 *verb;
        struct api_so_desc *desc = closure;
 
-       v = desc->binding->v1.verbs;
-       while (v->name && (strncasecmp(v->name, verb, lenverb) || v->name[lenverb]))
-               v++;
-       if (v->name)
-               call_check(req, context, v);
-       else
-               afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, verb, desc->binding->v1.prefix);
+       verb = desc->binding->v1.verbs;
+       while (verb->name && (strncasecmp(verb->name, strverb, lenverb) || verb->name[lenverb]))
+               verb++;
+       if (!verb->name)
+               afb_req_fail_f(req, "unknown-verb", "verb %.*s unknown within api %s", (int)lenverb, strverb, desc->binding->v1.prefix);
+       else if (call_check(req, context, verb)) {
+               if (0)
+                       /* not threaded */
+                       afb_sig_req_timeout(req, verb->callback, api_timeout);
+               else
+                       /* threaded */
+                       afb_thread_call(req, verb->callback, api_timeout, desc);
+       }
 }
 
 static int service_start_cb(void *closure, int share_session, int onneed)
index ad56392..25a5437 100644 (file)
 #include <stdlib.h>
 #include <signal.h>
 #include <string.h>
-#include <unistd.h>
-#include <time.h>
-#include <sys/syscall.h>
 #include <setjmp.h>
 
+#include <afb/afb-req-itf.h>
+
 #include "afb-sig-handler.h"
+#include "afb-thread.h"
 #include "verbose.h"
 
 static _Thread_local sigjmp_buf *error_handler;
@@ -75,15 +75,39 @@ int afb_sig_handler_init()
        return (install(on_signal_error, sigerr) & install(on_signal_terminate, sigterm)) - 1;
 }
 
+int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req))
+{
+       volatile int signum;
+       sigjmp_buf jmpbuf, *older;
+
+       older = error_handler;
+       signum = setjmp(jmpbuf);
+       if (signum != 0)
+               afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
+       else {
+               error_handler = &jmpbuf;
+               callback(req);
+       }
+       error_handler = older;
+       return signum;
+}
+
+int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout)
+{
+       int rc;
+
+       if (timeout)
+               afb_thread_timer_arm(timeout);
+       rc = afb_sig_req(req, callback);
+       afb_thread_timer_disarm();
+       return rc;
+}
+
 void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout)
 {
-       volatile int signum, timerset;
-       timer_t timerid;
+       volatile int signum;
        sigjmp_buf jmpbuf, *older;
-       struct sigevent sevp;
-       struct itimerspec its;
 
-       timerset = 0;
        older = error_handler;
        signum = setjmp(jmpbuf);
        if (signum != 0) {
@@ -91,28 +115,11 @@ void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeou
        }
        else {
                error_handler = &jmpbuf;
-               if (timeout > 0) {
-                       timerset = 1; /* TODO: check statuses */
-                       sevp.sigev_notify = SIGEV_THREAD_ID;
-                       sevp.sigev_signo = SIGALRM;
-                       sevp.sigev_value.sival_ptr = NULL;
-#if defined(sigev_notify_thread_id)
-                       sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
-#else
-                       sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
-#endif
-                       timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &timerid);
-                       its.it_interval.tv_sec = 0;
-                       its.it_interval.tv_nsec = 0;
-                       its.it_value.tv_sec = timeout;
-                       its.it_value.tv_nsec = 0;
-                       timer_settime(timerid, 0, &its, NULL);
-               }
-
+               if (timeout)
+                       afb_thread_timer_arm(timeout);
                function(0, closure);
        }
-       if (timerset)
-               timer_delete(timerid);
+       afb_thread_timer_disarm();
        error_handler = older;
 }
 
index 3c9626c..6fa000e 100644 (file)
 
 #pragma once
 
+struct afb_req;
+
 extern int afb_sig_handler_init();
 
 extern void afb_sig_monitor(void (*function)(int sig, void*), void *closure, int timeout);
+extern int afb_sig_req(struct afb_req req, void (*callback)(struct afb_req req));
+extern int afb_sig_req_timeout(struct afb_req req, void (*callback)(struct afb_req req), int timeout);
 
diff --git a/src/afb-thread.c b/src/afb-thread.c
new file mode 100644 (file)
index 0000000..5ce0930
--- /dev/null
@@ -0,0 +1,378 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <afb/afb-req-itf.h>
+
+#include "afb-thread.h"
+#include "afb-sig-handler.h"
+#include "verbose.h"
+
+/* control of threads */
+struct thread
+{
+       pthread_t tid;     /* the thread id */
+       unsigned stop: 1;  /* stop request */
+       unsigned ended: 1; /* ended status */
+       unsigned works: 1; /* is it processing a job? */
+};
+
+/* describes pending job */
+struct job
+{
+       void (*callback)(struct afb_req req); /* processing callback */
+       struct afb_req req; /* request to be processed */
+       int timeout;        /* timeout in second for processing the request */
+       int blocked;        /* is an other request blocking this one ? */
+       void *group;        /* group of the request */
+       struct job *next;   /* link to the next job enqueued */
+};
+
+/* synchronisation of threads */
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
+
+/* queue of pending jobs */
+static struct job *first_job = NULL;
+
+/* count allowed, started and running threads */
+static int allowed = 0;
+static int started = 0;
+static int running = 0;
+static int remains = 0;
+
+/* list of threads */
+static struct thread *threads = NULL;
+
+/* local timers */
+static _Thread_local int thread_timer_set;
+static _Thread_local timer_t thread_timerid;
+
+/*
+ * Creates a timer for the current thread
+ *
+ * Returns 0 in case of success
+ */
+int afb_thread_timer_create()
+{
+       int rc;
+       struct sigevent sevp;
+
+       if (thread_timer_set)
+               rc = 0;
+       else {
+               sevp.sigev_notify = SIGEV_THREAD_ID;
+               sevp.sigev_signo = SIGALRM;
+               sevp.sigev_value.sival_ptr = NULL;
+#if defined(sigev_notify_thread_id)
+               sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
+#else
+               sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
+#endif
+               rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
+               thread_timer_set = !rc;
+       }
+       return 0;
+}
+
+/*
+ * Arms the alarm in timeout seconds for the current thread
+ */
+int afb_thread_timer_arm(int timeout)
+{
+       int rc;
+       struct itimerspec its;
+
+       rc = afb_thread_timer_create();
+       if (rc == 0) {
+               its.it_interval.tv_sec = 0;
+               its.it_interval.tv_nsec = 0;
+               its.it_value.tv_sec = timeout;
+               its.it_value.tv_nsec = 0;
+               rc = timer_settime(thread_timerid, 0, &its, NULL);
+       }
+
+       return rc;
+}
+
+/*
+ * Disarms the current alarm
+ */
+void afb_thread_timer_disarm()
+{
+       if (thread_timer_set)
+               afb_thread_timer_arm(0);
+}
+
+/*
+ * Delstroy any alarm resource for the current thread
+ */
+void afb_thread_timer_delete()
+{
+       if (thread_timer_set) {
+               timer_delete(thread_timerid);
+               thread_timer_set = 0;
+       }
+}
+
+/* add the job to the list */
+static inline void job_add(struct job *job)
+{
+       void *group = job->group;
+       struct job *ijob, **pjob;
+
+       pjob = &first_job;
+       ijob = first_job;
+       group = job->group;
+       if (group == NULL)
+               group = job;
+       while (ijob) {
+               if (ijob->group == group)
+                       job->blocked = 1;
+               pjob = &ijob->next;
+               ijob = ijob->next;
+       }
+       *pjob = job;
+       job->next = NULL;
+       remains--;
+}
+
+/* get the next job to process or NULL if none */
+static inline struct job *job_get()
+{
+       struct job *job, **pjob;
+       pjob = &first_job;
+       job = first_job;
+       while (job && job->blocked) {
+               pjob = &job->next;
+               job = job->next;
+       }
+       if (job) {
+               *pjob = job->next;
+               remains++;
+       }
+       return job;
+}
+
+/* unblock a group of job */
+static inline void job_unblock(void *group)
+{
+       struct job *job;
+
+       job = first_job;
+       while (job) {
+               if (job->group == group) {
+                       job->blocked = 0;
+                       break;
+               }
+               job = job->next;
+       }
+}
+
+/* main loop of processing threads */
+static void *thread_main_loop(void *data)
+{
+       struct thread *me = data;
+       struct job *job, j;
+
+       me->works = 0;
+       me->ended = 0;
+       afb_thread_timer_create();
+       pthread_mutex_lock(&mutex);
+       while (!me->stop) {
+               /* get a job */
+               job = job_get();
+               if (job == NULL && first_job != NULL && running == 0) {
+                       /* sad situation!! should not happen */
+                       ERROR("threads are blocked!");
+                       job = first_job;
+                       first_job = job->next;
+               }
+               if (job == NULL) {
+                       /* no job... */
+                       pthread_cond_wait(&cond, &mutex);
+               } else {
+                       /* run the job */
+                       running++;
+                       me->works = 1;
+                       pthread_mutex_unlock(&mutex);
+                       j = *job;
+                       free(job);
+                       afb_thread_timer_arm(j.timeout);
+                       afb_sig_req(j.req, j.callback);
+                       afb_thread_timer_disarm();
+                       afb_req_unref(j.req);
+                       pthread_mutex_lock(&mutex);
+                       if (j.group != NULL)
+                               job_unblock(j.group);
+                       me->works = 0;
+                       running--;
+               }
+
+       }
+       me->ended = 1;
+       pthread_mutex_unlock(&mutex);
+       afb_thread_timer_delete();
+       return me;
+}
+
+/* start a new thread */
+static int start_one_thread()
+{
+       struct thread *t;
+       int rc;
+
+       assert(started < allowed);
+
+       t = &threads[started++];
+       t->stop = 0;
+       rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+       if (rc != 0) {
+               started--;
+               errno = rc;
+               WARNING("not able to start thread: %m");
+               rc = -1;
+       }
+       return rc;
+}
+
+/* process the 'request' with the 'callback' using a separate thread if available */
+void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
+{
+       const char *info;
+       struct job *job;
+       int rc;
+
+       /* allocates the job */
+       job = malloc(sizeof *job);
+       if (job == NULL) {
+               info = "out of memory";
+               goto error;
+       }
+
+       /* start a thread if needed */
+       pthread_mutex_lock(&mutex);
+       if (remains == 0) {
+               info = "too many jobs";
+               goto error2;
+       }
+       if (started == running && started < allowed) {
+               rc = start_one_thread();
+               if (rc < 0 && started == 0) {
+                       /* failed to start threading */
+                       info = "can't start thread";
+                       goto error2;
+               }
+       }
+
+       /* fills and queues the job */
+       job->callback = callback;
+       job->req = req;
+       job->timeout = timeout;
+       job->group = group;
+       afb_req_addref(req);
+       job_add(job);
+       pthread_mutex_unlock(&mutex);
+
+       /* signal an existing job */
+       pthread_cond_signal(&cond);
+       return;
+
+error2:
+       pthread_mutex_unlock(&mutex);
+       free(job);
+error:
+       ERROR("can't process job with threads: %s", info);
+       afb_req_fail(req, "internal-error", info);
+}
+
+/* initialise the threads */
+int afb_thread_init(int allowed_count, int start_count, int waiter_count)
+{
+       threads = calloc(allowed_count, sizeof *threads);
+       if (threads == NULL) {
+               errno = ENOMEM;
+               ERROR("can't allocate threads");
+               return -1;
+       }
+
+       /* records the allowed count */
+       allowed = allowed_count;
+       started = 0;
+       running = 0;
+       remains = waiter_count;
+
+       /* start at least one thread */
+       pthread_mutex_lock(&mutex);
+       while (started < start_count && start_one_thread() == 0);
+       pthread_mutex_unlock(&mutex);
+
+       /* end */
+       return -(started != start_count);
+}
+
+/* terminate all the threads and all pending requests */
+void afb_thread_terminate()
+{
+       int i, n;
+       struct job *job;
+
+       /* request all threads to stop */
+       pthread_mutex_lock(&mutex);
+       allowed = 0;
+       n = started;
+       for (i = 0 ; i < n ; i++)
+               threads[i].stop = 1;
+
+       /* wait until all thread are terminated */
+       while (started != 0) {
+               /* signal threads */
+               pthread_mutex_unlock(&mutex);
+               pthread_cond_broadcast(&cond);
+               pthread_mutex_lock(&mutex);
+
+               /* join the terminated threads */
+               for (i = 0 ; i < n ; i++) {
+                       if (threads[i].tid && threads[i].ended) {
+                               pthread_join(threads[i].tid, NULL);
+                               threads[i].tid = 0;
+                               started--;
+                       }
+               }
+       }
+       pthread_mutex_unlock(&mutex);
+       free(threads);
+
+       /* cancel pending jobs */
+       while (first_job) {
+               job = first_job;
+               first_job = job->next;
+               afb_req_fail(job->req, "aborted", "termination of threading");
+               afb_req_unref(job->req);
+               free(job);
+       }
+}
diff --git a/src/afb-thread.h b/src/afb-thread.h
new file mode 100644 (file)
index 0000000..e25f90e
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+struct afb_req;
+
+extern void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group);
+
+extern int afb_thread_init(int allowed_count, int start_count, int waiter_count);
+extern void afb_thread_terminate();
+
+extern int afb_thread_timer_create();
+extern int afb_thread_timer_arm(int timeout);
+extern void afb_thread_timer_disarm();
+extern void afb_thread_timer_delete();
+
index 5871d12..3859818 100644 (file)
@@ -39,6 +39,7 @@
 #include "afb-context.h"
 #include "afb-hreq.h"
 #include "afb-sig-handler.h"
+#include "afb-thread.h"
 #include "session.h"
 #include "verbose.h"
 #include "afb-common.h"
@@ -650,12 +651,17 @@ int main(int argc, char *argv[])  {
   }
 
   if (afb_sig_handler_init() < 0) {
-     ERROR("main fail to initialise signal handlers");
+     ERROR("failed to initialise signal handlers");
      return 1;
   }
 
   if (afb_common_rootdir_set(config->rootdir) < 0) {
-     ERROR("main fail to set common root directory");
+     ERROR("failed to set common root directory");
+     return 1;
+  }
+
+  if (afb_thread_init(3, 1, 20) < 0) {
+     ERROR("failed to initialise threading");
      return 1;
   }
 
diff --git a/src/tests/test-thread.c b/src/tests/test-thread.c
new file mode 100644 (file)
index 0000000..cc676bd
--- /dev/null
@@ -0,0 +1,99 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <pthread.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#include <afb/afb-req-itf.h>
+#include "../afb-thread.h"
+
+struct foo {
+       int value;
+       int refcount;
+};
+
+void addref(void *closure)
+{
+       struct foo *foo = closure;
+       foo->refcount++;
+}
+
+void unref(void *closure)
+{
+       struct foo *foo = closure;
+       if(!--foo->refcount) {
+               printf("%06d FREE\n", foo->value);
+               free(foo);
+       }
+}
+
+void fail(void *closure, const char *status, const char *info)
+{
+       struct foo *foo = closure;
+       printf("%06d ERROR %s\n", foo->value, status);
+}
+
+struct afb_req_itf itf = {
+       .json = NULL,
+       .get = NULL,
+
+       .success = NULL,
+       .fail = fail,
+
+       .raw = NULL,
+       .send = NULL,
+
+       .context_get = NULL,
+       .context_set = NULL,
+
+       .addref = addref,
+       .unref = unref,
+
+       .session_close = NULL,
+       .session_set_LOA = NULL,
+
+       .subscribe = NULL,
+       .unsubscribe = NULL,
+
+       .subcall = NULL
+};
+
+void process(struct afb_req req)
+{
+       struct timespec ts;
+       struct foo *foo = req.closure;
+       printf("%06d PROCESS T%d\n", foo->value, (int)syscall(SYS_gettid));
+       ts.tv_sec = 0;
+       ts.tv_nsec = foo->value * 1000;
+//     nanosleep(&ts, NULL);
+}
+
+int main()
+{
+       int i;
+       struct foo *foo;
+       struct afb_req req;
+       struct timespec ts;
+
+       req.itf = &itf;
+       afb_thread_init(4, 1);
+       for (i = 0 ; i  < 10000 ; i++) {
+               req.closure = foo = malloc(sizeof *foo);
+               foo->value = i;
+               foo->refcount = 1;
+               afb_thread_call(req, process, 5, (&ts) + (i % 4));
+               unref(foo);
+               ts.tv_sec = 0;
+               ts.tv_nsec = 1000000;
+//             nanosleep(&ts, NULL);
+       }
+       ts.tv_sec = 1;
+       ts.tv_nsec = 0;
+       nanosleep(&ts, NULL);
+       afb_thread_terminate();
+}
+
+
+
+
diff --git a/src/tests/test-thread.sh b/src/tests/test-thread.sh
new file mode 100755 (executable)
index 0000000..6d1b3a5
--- /dev/null
@@ -0,0 +1,4 @@
+#!/bin/sh
+
+cc test-thread.c ../afb-thread.c ../verbose.c ../afb-sig-handler.c -o test-thread -lrt -lpthread -I../../include
+./test-thread