2 * Copyright (C) 2016, 2017 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include <sys/syscall.h>
30 #include "sig-monitor.h"
33 /* control of threads */
36 pthread_t tid; /* the thread id */
37 unsigned stop: 1; /* stop request */
38 unsigned ended: 1; /* ended status */
39 unsigned works: 1; /* is it processing a job? */
42 /* describes pending job */
45 struct job *next; /* link to the next job enqueued */
46 void *group; /* group of the request */
47 void (*callback)(int,void*,void*,void*); /* processing callback */
48 void *arg1; /* first arg */
49 void *arg2; /* second arg */
50 void *arg3; /* second arg */
51 int timeout; /* timeout in second for processing the request */
52 int blocked; /* is an other request blocking this one ? */
55 /* synchronisation of threads */
56 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
57 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
59 /* queue of pending jobs */
60 static struct job *first_job = NULL;
62 /* count allowed, started and running threads */
63 static int allowed = 0;
64 static int started = 0;
65 static int running = 0;
66 static int remains = 0;
69 static struct thread *threads = NULL;
71 /* add the job to the list */
72 static inline void job_add(struct job *job)
74 void *group = job->group;
75 struct job *ijob, **pjob;
79 group = job->group ? : job;
81 if (ijob->group == group)
91 /* get the next job to process or NULL if none */
92 static inline struct job *job_get()
94 struct job *job, **pjob;
97 while (job && job->blocked) {
108 /* unblock a group of job */
109 static inline void job_unblock(void *group)
115 if (job->group == group) {
124 static inline void job_call(int signum, void *arg)
126 struct job *job = arg;
127 job->callback(signum, job->arg1, job->arg2, job->arg3);
131 static inline void job_cancel(int signum, void *arg)
133 struct job *job = arg;
134 job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
137 /* main loop of processing threads */
138 static void *thread_main_loop(void *data)
140 struct thread *me = data;
145 sig_monitor_init_timeouts();
146 pthread_mutex_lock(&mutex);
150 if (job == NULL && first_job != NULL && running == 0) {
151 /* sad situation!! should not happen */
152 ERROR("threads are blocked!");
154 first_job = job->next;
158 pthread_cond_wait(&cond, &mutex);
163 pthread_mutex_unlock(&mutex);
164 sig_monitor(job->timeout, job_call, job);
165 pthread_mutex_lock(&mutex);
168 if (job->group != NULL)
169 job_unblock(job->group);
175 pthread_mutex_unlock(&mutex);
176 sig_monitor_clean_timeouts();
180 /* start a new thread */
181 static int start_one_thread()
186 assert(started < allowed);
188 t = &threads[started++];
190 rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
194 WARNING("not able to start thread: %m");
203 void (*callback)(int, void*),
206 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
212 void (*callback)(int, void*, void*),
216 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
219 /* queue the job to the 'callback' using a separate thread if available */
223 void (*callback)(int, void*, void *, void*),
232 /* allocates the job */
233 job = malloc(sizeof *job);
236 info = "out of memory";
240 /* start a thread if needed */
241 pthread_mutex_lock(&mutex);
244 info = "too many jobs";
247 if (started == running && started < allowed) {
248 rc = start_one_thread();
249 if (rc < 0 && started == 0) {
250 /* failed to start threading */
251 info = "can't start first thread";
256 /* fills and queues the job */
258 job->timeout = timeout;
259 job->callback = callback;
265 pthread_mutex_unlock(&mutex);
267 /* signal an existing job */
268 pthread_cond_signal(&cond);
272 pthread_mutex_unlock(&mutex);
275 ERROR("can't process job with threads: %s, %m", info);
279 /* initialise the threads */
280 int jobs_init(int allowed_count, int start_count, int waiter_count)
282 threads = calloc(allowed_count, sizeof *threads);
283 if (threads == NULL) {
285 ERROR("can't allocate threads");
289 /* records the allowed count */
290 allowed = allowed_count;
293 remains = waiter_count;
295 /* start at least one thread */
296 pthread_mutex_lock(&mutex);
297 while (started < start_count && start_one_thread() == 0);
298 pthread_mutex_unlock(&mutex);
301 return -(started != start_count);
304 /* terminate all the threads and all pending requests */
305 void jobs_terminate()
310 /* request all threads to stop */
311 pthread_mutex_lock(&mutex);
314 for (i = 0 ; i < n ; i++)
317 /* wait until all thread are terminated */
318 while (started != 0) {
320 pthread_mutex_unlock(&mutex);
321 pthread_cond_broadcast(&cond);
322 pthread_mutex_lock(&mutex);
324 /* join the terminated threads */
325 for (i = 0 ; i < n ; i++) {
326 if (threads[i].tid && threads[i].ended) {
327 pthread_join(threads[i].tid, NULL);
333 pthread_mutex_unlock(&mutex);
336 /* cancel pending jobs */
339 first_job = job->next;
340 sig_monitor(0, job_cancel, job);