/*
- * Copyright (C) 2016 "IoT.bzh"
+ * Copyright (C) 2016, 2017 "IoT.bzh"
* Author José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
#define _GNU_SOURCE
-#include <stdlib.h>
-#include <unistd.h>
-#include <signal.h>
-#include <time.h>
-#include <sys/syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <assert.h>
+#include <string.h>
#include <afb/afb-req-itf.h>
#include "afb-thread.h"
-#include "afb-sig-handler.h"
+#include "jobs.h"
+#include "sig-monitor.h"
#include "verbose.h"
-/* control of threads */
-struct thread
+static void req_call(int signum, void *arg1, void *arg2, void *arg3)
{
- pthread_t tid; /* the thread id */
- unsigned stop: 1; /* stop request */
- unsigned ended: 1; /* ended status */
- unsigned works: 1; /* is it processing a job? */
-};
+ struct afb_req req = { .itf = arg1, .closure = arg2 };
+ void (*callback)(struct afb_req) = arg3;
-/* describes pending job */
-struct job
-{
- void (*callback)(struct afb_req req); /* processing callback */
- struct afb_req req; /* request to be processed */
- int timeout; /* timeout in second for processing the request */
- int blocked; /* is an other request blocking this one ? */
- void *group; /* group of the request */
- struct job *next; /* link to the next job enqueued */
-};
-
-/* synchronisation of threads */
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-
-/* queue of pending jobs */
-static struct job *first_job = NULL;
-
-/* count allowed, started and running threads */
-static int allowed = 0;
-static int started = 0;
-static int running = 0;
-static int remains = 0;
-
-/* list of threads */
-static struct thread *threads = NULL;
-
-/* local timers */
-static _Thread_local int thread_timer_set;
-static _Thread_local timer_t thread_timerid;
-
-/*
- * Creates a timer for the current thread
- *
- * Returns 0 in case of success
- */
-int afb_thread_timer_create()
-{
- int rc;
- struct sigevent sevp;
-
- if (thread_timer_set)
- rc = 0;
- else {
- sevp.sigev_notify = SIGEV_THREAD_ID;
- sevp.sigev_signo = SIGALRM;
- sevp.sigev_value.sival_ptr = NULL;
-#if defined(sigev_notify_thread_id)
- sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
-#else
- sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
-#endif
- rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
- thread_timer_set = !rc;
- }
- return 0;
+ if (signum != 0)
+ afb_req_fail_f(req, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
+ else
+ callback(req);
+ afb_req_unref(req);
}
-/*
- * Arms the alarm in timeout seconds for the current thread
- */
-int afb_thread_timer_arm(int timeout)
+void afb_thread_req_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
{
int rc;
- struct itimerspec its;
-
- rc = afb_thread_timer_create();
- if (rc == 0) {
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = timeout;
- its.it_value.tv_nsec = 0;
- rc = timer_settime(thread_timerid, 0, &its, NULL);
- }
- return rc;
-}
-
-/*
- * Disarms the current alarm
- */
-void afb_thread_timer_disarm()
-{
- if (thread_timer_set)
- afb_thread_timer_arm(0);
-}
-
-/*
- * Delstroy any alarm resource for the current thread
- */
-void afb_thread_timer_delete()
-{
- if (thread_timer_set) {
- timer_delete(thread_timerid);
- thread_timer_set = 0;
- }
-}
-
-/* add the job to the list */
-static inline void job_add(struct job *job)
-{
- void *group = job->group;
- struct job *ijob, **pjob;
-
- pjob = &first_job;
- ijob = first_job;
- group = job->group;
- if (group == NULL)
- group = job;
- while (ijob) {
- if (ijob->group == group)
- job->blocked = 1;
- pjob = &ijob->next;
- ijob = ijob->next;
- }
- *pjob = job;
- job->next = NULL;
- remains--;
-}
-
-/* get the next job to process or NULL if none */
-static inline struct job *job_get()
-{
- struct job *job, **pjob;
- pjob = &first_job;
- job = first_job;
- while (job && job->blocked) {
- pjob = &job->next;
- job = job->next;
- }
- if (job) {
- *pjob = job->next;
- remains++;
- }
- return job;
-}
-
-/* unblock a group of job */
-static inline void job_unblock(void *group)
-{
- struct job *job;
-
- job = first_job;
- while (job) {
- if (job->group == group) {
- job->blocked = 0;
- break;
- }
- job = job->next;
- }
-}
-
-/* main loop of processing threads */
-static void *thread_main_loop(void *data)
-{
- struct thread *me = data;
- struct job *job, j;
-
- me->works = 0;
- me->ended = 0;
- afb_thread_timer_create();
- pthread_mutex_lock(&mutex);
- while (!me->stop) {
- /* get a job */
- job = job_get();
- if (job == NULL && first_job != NULL && running == 0) {
- /* sad situation!! should not happen */
- ERROR("threads are blocked!");
- job = first_job;
- first_job = job->next;
- }
- if (job == NULL) {
- /* no job... */
- pthread_cond_wait(&cond, &mutex);
- } else {
- /* run the job */
- running++;
- me->works = 1;
- pthread_mutex_unlock(&mutex);
- j = *job;
- free(job);
- afb_thread_timer_arm(j.timeout);
- afb_sig_req(j.req, j.callback);
- afb_thread_timer_disarm();
- afb_req_unref(j.req);
- pthread_mutex_lock(&mutex);
- if (j.group != NULL)
- job_unblock(j.group);
- me->works = 0;
- running--;
- }
-
- }
- me->ended = 1;
- pthread_mutex_unlock(&mutex);
- afb_thread_timer_delete();
- return me;
-}
-
-/* start a new thread */
-static int start_one_thread()
-{
- struct thread *t;
- int rc;
-
- assert(started < allowed);
-
- t = &threads[started++];
- t->stop = 0;
- rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
- if (rc != 0) {
- started--;
- errno = rc;
- WARNING("not able to start thread: %m");
- rc = -1;
- }
- return rc;
-}
-
-/* process the 'request' with the 'callback' using a separate thread if available */
-void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
-{
- const char *info;
- struct job *job;
- int rc;
-
- /* allocates the job */
- job = malloc(sizeof *job);
- if (job == NULL) {
- info = "out of memory";
- goto error;
- }
-
- /* start a thread if needed */
- pthread_mutex_lock(&mutex);
- if (remains == 0) {
- info = "too many jobs";
- goto error2;
- }
- if (started == running && started < allowed) {
- rc = start_one_thread();
- if (rc < 0 && started == 0) {
- /* failed to start threading */
- info = "can't start thread";
- goto error2;
- }
- }
-
- /* fills and queues the job */
- job->callback = callback;
- job->req = req;
- job->timeout = timeout;
- job->group = group;
afb_req_addref(req);
- job_add(job);
- pthread_mutex_unlock(&mutex);
-
- /* signal an existing job */
- pthread_cond_signal(&cond);
- return;
-
-error2:
- pthread_mutex_unlock(&mutex);
- free(job);
-error:
- ERROR("can't process job with threads: %s", info);
- afb_req_fail(req, "internal-error", info);
-}
-
-/* initialise the threads */
-int afb_thread_init(int allowed_count, int start_count, int waiter_count)
-{
- threads = calloc(allowed_count, sizeof *threads);
- if (threads == NULL) {
- errno = ENOMEM;
- ERROR("can't allocate threads");
- return -1;
- }
-
- /* records the allowed count */
- allowed = allowed_count;
- started = 0;
- running = 0;
- remains = waiter_count;
-
- /* start at least one thread */
- pthread_mutex_lock(&mutex);
- while (started < start_count && start_one_thread() == 0);
- pthread_mutex_unlock(&mutex);
-
- /* end */
- return -(started != start_count);
-}
-
-/* terminate all the threads and all pending requests */
-void afb_thread_terminate()
-{
- int i, n;
- struct job *job;
-
- /* request all threads to stop */
- pthread_mutex_lock(&mutex);
- allowed = 0;
- n = started;
- for (i = 0 ; i < n ; i++)
- threads[i].stop = 1;
-
- /* wait until all thread are terminated */
- while (started != 0) {
- /* signal threads */
- pthread_mutex_unlock(&mutex);
- pthread_cond_broadcast(&cond);
- pthread_mutex_lock(&mutex);
-
- /* join the terminated threads */
- for (i = 0 ; i < n ; i++) {
- if (threads[i].tid && threads[i].ended) {
- pthread_join(threads[i].tid, NULL);
- threads[i].tid = 0;
- started--;
- }
+ if (0) {
+ /* no threading */
+ sig_monitor3(timeout, req_call, (void*)req.itf, req.closure, callback);
+ } else {
+ /* threading */
+ rc = jobs_queue3(group, timeout, req_call, (void*)req.itf, req.closure, callback);
+ if (rc < 0) {
+ /* TODO: allows or not to proccess it directly as when no threading? (see above) */
+ ERROR("can't process job with threads: %m");
+ afb_req_fail_f(req, "cancelled", "not able to pipe a job for the task");
+ afb_req_unref(req);
}
}
- pthread_mutex_unlock(&mutex);
- free(threads);
-
- /* cancel pending jobs */
- while (first_job) {
- job = first_job;
- first_job = job->next;
- afb_req_fail(job->req, "aborted", "termination of threading");
- afb_req_unref(job->req);
- free(job);
- }
}
+