2 * Copyright (C) 2016, 2017 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include <sys/syscall.h>
30 #include "sig-monitor.h"
33 /** control of threads */
36 struct thread *next; /**< next thread of the list */
37 pthread_t tid; /**< the thread id */
38 unsigned stop: 1; /**< stop request */
41 /* describes pending job */
44 struct job *next; /* link to the next job enqueued */
45 void *group; /* group of the request */
46 void (*callback)(int,void*,void*,void*); /* processing callback */
47 void *arg1; /* first arg */
48 void *arg2; /* second arg */
49 void *arg3; /* second arg */
50 int timeout; /* timeout in second for processing the request */
51 int blocked; /* is an other request blocking this one ? */
54 /* synchronisation of threads */
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
58 /* count allowed, started and running threads */
59 static int allowed = 0; /** allowed count of threads */
60 static int started = 0; /** started count of threads */
61 static int running = 0; /** running count of threads */
62 static int remains = 0; /** remaining count of jobs that can be created */
65 static struct thread *threads;
67 /* queue of pending jobs */
68 static struct job *first_job;
69 static struct job *first_evloop;
70 static struct job *free_jobs;
73 * Adds the 'job' at the end of the list of jobs, marking it
74 * as blocked if an other job with the same group is pending.
75 * @param job the job to add
77 static inline void job_add(struct job *job)
80 struct job *ijob, **pjob;
84 group = job->group ? : (void*)(intptr_t)1;
86 if (ijob->group == group)
97 * Get the next job to process or NULL if none.
98 * The returned job if any is removed from the list of
100 * @return the job to process
102 static inline struct job *job_get()
104 struct job *job, **pjob;
108 while (job && job->blocked) {
120 * Unblock the first pending job of a group (if any)
121 * @param group the group to unblock
123 static inline void job_unblock(void *group)
129 if (job->group == group) {
137 static struct job *job_create(
140 void (*callback)(int, void*, void *, void*),
147 /* allocates the job */
150 pthread_mutex_unlock(&mutex);
151 job = malloc(sizeof *job);
152 pthread_mutex_lock(&mutex);
159 job->timeout = timeout;
160 job->callback = callback;
169 static inline void job_destroy(struct job *job)
171 job->next = free_jobs;
175 static inline void job_release(struct job *job)
178 job_unblock(job->group);
182 /** monitored call to the job */
183 static void job_call(int signum, void *arg)
185 struct job *job = arg;
186 job->callback(signum, job->arg1, job->arg2, job->arg3);
189 /** monitored cancel of the job */
190 static void job_cancel(int signum, void *arg)
192 job_call(SIGABRT, arg);
195 /* main loop of processing threads */
196 static void *thread_main_loop(void *data)
198 struct thread me, **prv;
202 me.tid = pthread_self();
204 sig_monitor_init_timeouts();
207 pthread_mutex_lock(&mutex);
211 /* loop until stopped */
216 if (!job && first_job && running == 0) {
217 /* sad situation!! should not happen */
218 ERROR("threads are blocked!");
220 first_job = job->next;
224 pthread_mutex_unlock(&mutex);
225 sig_monitor(job->timeout, job_call, job);
226 pthread_mutex_lock(&mutex);
229 /* no job, check evloop */
233 first_evloop = job->next;
234 pthread_mutex_unlock(&mutex);
235 sig_monitor(job->timeout, job_call, job);
236 pthread_mutex_lock(&mutex);
237 job->next = first_evloop;
240 /* no job and not evloop */
242 pthread_cond_wait(&cond, &mutex);
254 pthread_mutex_unlock(&mutex);
256 /* uninit and terminate */
257 sig_monitor_clean_timeouts();
261 /* start a new thread */
262 static int start_one_thread()
267 assert(started < allowed);
270 rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
274 WARNING("not able to start thread: %m");
283 void (*callback)(int, void*),
286 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
292 void (*callback)(int, void*, void*),
296 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
299 /* queue the job to the 'callback' using a separate thread if available */
303 void (*callback)(int, void*, void *, void*),
312 pthread_mutex_lock(&mutex);
314 /* allocates the job */
315 job = job_create(group, timeout, callback, arg1, arg2, arg3);
318 info = "out of memory";
322 /* start a thread if needed */
325 info = "too many jobs";
328 if (started == running && started < allowed) {
329 rc = start_one_thread();
330 if (rc < 0 && started == 0) {
331 /* failed to start threading */
332 info = "can't start first thread";
339 pthread_mutex_unlock(&mutex);
341 /* signal an existing job */
342 pthread_cond_signal(&cond);
348 ERROR("can't process job with threads: %s, %m", info);
349 pthread_mutex_unlock(&mutex);
353 /* initialise the threads */
354 int jobs_init(int allowed_count, int start_count, int waiter_count)
356 /* records the allowed count */
357 allowed = allowed_count;
360 remains = waiter_count;
362 /* start at least one thread */
363 pthread_mutex_lock(&mutex);
364 while (started < start_count && start_one_thread() == 0);
365 pthread_mutex_unlock(&mutex);
368 return -(started != start_count);
371 /* terminate all the threads and all pending requests */
372 void jobs_terminate()
381 /* request all threads to stop */
382 pthread_mutex_lock(&mutex);
385 /* search the next thread to stop */
387 while (t && pthread_equal(t->tid, me))
394 pthread_mutex_unlock(&mutex);
395 pthread_cond_broadcast(&cond);
396 pthread_join(other, NULL);
397 pthread_mutex_lock(&mutex);
400 /* cancel pending jobs */
403 first_job = job->next;
404 sig_monitor(0, job_cancel, job);
409 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
413 pthread_mutex_lock(&mutex);
414 job = job_create(key, timeout, (void (*)(int, void *, void *, void *))evloop, closure, NULL, NULL);
417 job->next = first_evloop;
420 /* signal the loop */
421 pthread_cond_signal(&cond);
423 pthread_mutex_unlock(&mutex);
435 /* request all threads to stop */
436 pthread_mutex_lock(&mutex);
439 if (pthread_equal(t->tid, me)) {
440 pthread_mutex_unlock(&mutex);
441 ERROR("thread already running");
450 pthread_mutex_unlock(&mutex);
453 thread_main_loop(NULL);
456 pthread_mutex_lock(&mutex);
458 pthread_mutex_unlock(&mutex);