/* * Copyright (C) 2016, 2017 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include "jobs.h" #include "sig-monitor.h" #include "verbose.h" /* control of threads */ struct thread { pthread_t tid; /* the thread id */ unsigned stop: 1; /* stop request */ unsigned ended: 1; /* ended status */ unsigned works: 1; /* is it processing a job? */ }; /* describes pending job */ struct job { struct job *next; /* link to the next job enqueued */ void *group; /* group of the request */ void (*callback)(int,void*,void*,void*); /* processing callback */ void *arg1; /* first arg */ void *arg2; /* second arg */ void *arg3; /* second arg */ int timeout; /* timeout in second for processing the request */ int blocked; /* is an other request blocking this one ? */ }; /* synchronisation of threads */ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; /* queue of pending jobs */ static struct job *first_job = NULL; /* count allowed, started and running threads */ static int allowed = 0; static int started = 0; static int running = 0; static int remains = 0; /* list of threads */ static struct thread *threads = NULL; /* add the job to the list */ static inline void job_add(struct job *job) { void *group = job->group; struct job *ijob, **pjob; pjob = &first_job; ijob = first_job; group = job->group ? : job; while (ijob) { if (ijob->group == group) job->blocked = 1; pjob = &ijob->next; ijob = ijob->next; } *pjob = job; job->next = NULL; remains--; } /* get the next job to process or NULL if none */ static inline struct job *job_get() { struct job *job, **pjob; pjob = &first_job; job = first_job; while (job && job->blocked) { pjob = &job->next; job = job->next; } if (job) { *pjob = job->next; remains++; } return job; } /* unblock a group of job */ static inline void job_unblock(void *group) { struct job *job; job = first_job; while (job) { if (job->group == group) { job->blocked = 0; break; } job = job->next; } } /* call the job */ static inline void job_call(int signum, void *arg) { struct job *job = arg; job->callback(signum, job->arg1, job->arg2, job->arg3); } /* cancel the job */ static inline void job_cancel(int signum, void *arg) { struct job *job = arg; job->callback(SIGABRT, job->arg1, job->arg2, job->arg3); } /* main loop of processing threads */ static void *thread_main_loop(void *data) { struct thread *me = data; struct job *job; me->works = 0; me->ended = 0; sig_monitor_init_timeouts(); pthread_mutex_lock(&mutex); while (!me->stop) { /* get a job */ job = job_get(); if (job == NULL && first_job != NULL && running == 0) { /* sad situation!! should not happen */ ERROR("threads are blocked!"); job = first_job; first_job = job->next; } if (job == NULL) { /* no job... */ pthread_cond_wait(&cond, &mutex); } else { /* run the job */ running++; me->works = 1; pthread_mutex_unlock(&mutex); sig_monitor(job->timeout, job_call, job); pthread_mutex_lock(&mutex); me->works = 0; running--; if (job->group != NULL) job_unblock(job->group); free(job); } } me->ended = 1; pthread_mutex_unlock(&mutex); sig_monitor_clean_timeouts(); return me; } /* start a new thread */ static int start_one_thread() { struct thread *t; int rc; assert(started < allowed); t = &threads[started++]; t->stop = 0; rc = pthread_create(&t->tid, NULL, thread_main_loop, t); if (rc != 0) { started--; errno = rc; WARNING("not able to start thread: %m"); rc = -1; } return rc; } int jobs_queue( void *group, int timeout, void (*callback)(int, void*), void *arg) { return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL); } int jobs_queue2( void *group, int timeout, void (*callback)(int, void*, void*), void *arg1, void *arg2) { return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL); } /* queue the job to the 'callback' using a separate thread if available */ int jobs_queue3( void *group, int timeout, void (*callback)(int, void*, void *, void*), void *arg1, void *arg2, void *arg3) { const char *info; struct job *job; int rc; /* allocates the job */ job = malloc(sizeof *job); if (job == NULL) { errno = ENOMEM; info = "out of memory"; goto error; } /* start a thread if needed */ pthread_mutex_lock(&mutex); if (remains == 0) { errno = EBUSY; info = "too many jobs"; goto error2; } if (started == running && started < allowed) { rc = start_one_thread(); if (rc < 0 && started == 0) { /* failed to start threading */ info = "can't start first thread"; goto error2; } } /* fills and queues the job */ job->group = group; job->timeout = timeout; job->callback = callback; job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; job->blocked = 0; job_add(job); pthread_mutex_unlock(&mutex); /* signal an existing job */ pthread_cond_signal(&cond); return 0; error2: pthread_mutex_unlock(&mutex); free(job); error: ERROR("can't process job with threads: %s, %m", info); return -1; } /* initialise the threads */ int jobs_init(int allowed_count, int start_count, int waiter_count) { threads = calloc(allowed_count, sizeof *threads); if (threads == NULL) { errno = ENOMEM; ERROR("can't allocate threads"); return -1; } /* records the allowed count */ allowed = allowed_count; started = 0; running = 0; remains = waiter_count; /* start at least one thread */ pthread_mutex_lock(&mutex); while (started < start_count && start_one_thread() == 0); pthread_mutex_unlock(&mutex); /* end */ return -(started != start_count); } /* terminate all the threads and all pending requests */ void jobs_terminate() { int i, n; struct job *job; /* request all threads to stop */ pthread_mutex_lock(&mutex); allowed = 0; n = started; for (i = 0 ; i < n ; i++) threads[i].stop = 1; /* wait until all thread are terminated */ while (started != 0) { /* signal threads */ pthread_mutex_unlock(&mutex); pthread_cond_broadcast(&cond); pthread_mutex_lock(&mutex); /* join the terminated threads */ for (i = 0 ; i < n ; i++) { if (threads[i].tid && threads[i].ended) { pthread_join(threads[i].tid, NULL); threads[i].tid = 0; started--; } } } pthread_mutex_unlock(&mutex); free(threads); /* cancel pending jobs */ while (first_job) { job = first_job; first_job = job->next; sig_monitor(0, job_cancel, job); free(job); } }