2 * Copyright (C) 2016, 2017 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include <sys/syscall.h>
30 #include "sig-monitor.h"
33 /** control of threads */
36 struct thread *next; /**< next thread of the list */
37 pthread_t tid; /**< the thread id */
38 unsigned stop: 1; /**< stop request */
41 /* describes pending job */
44 struct job *next; /* link to the next job enqueued */
45 void *group; /* group of the request */
46 void (*callback)(int,void*,void*,void*); /* processing callback */
47 void *arg1; /* first arg */
48 void *arg2; /* second arg */
49 void *arg3; /* second arg */
50 int timeout; /* timeout in second for processing the request */
51 int blocked; /* is an other request blocking this one ? */
54 /* synchronisation of threads */
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
58 /* count allowed, started and running threads */
59 static int allowed = 0; /** allowed count of threads */
60 static int started = 0; /** started count of threads */
61 static int running = 0; /** running count of threads */
62 static int remains = 0; /** remaining count of jobs that can be created */
65 static struct thread *threads;
67 /* queue of pending jobs */
68 static struct job *first_job;
69 static struct job *first_evloop;
70 static struct job *free_jobs;
73 * Adds the 'job' at the end of the list of jobs, marking it
74 * as blocked if an other job with the same group is pending.
75 * @param job the job to add
77 static inline void job_add(struct job *job)
80 struct job *ijob, **pjob;
84 group = job->group ? : (void*)(intptr_t)1;
86 if (ijob->group == group)
97 * Get the next job to process or NULL if none.
98 * The returned job if any is removed from the list of
100 * @return the job to process
102 static inline struct job *job_get()
104 struct job *job, **pjob;
108 while (job && job->blocked) {
120 * Unblock the first pending job of a group (if any)
121 * @param group the group to unblock
123 static inline void job_unblock(void *group)
129 if (job->group == group) {
137 static struct job *job_create(
140 void (*callback)(int, void*, void *, void*),
147 /* allocates the job */
150 free_jobs = job->next;
152 pthread_mutex_unlock(&mutex);
153 job = malloc(sizeof *job);
154 pthread_mutex_lock(&mutex);
161 job->timeout = timeout;
162 job->callback = callback;
171 static inline void job_destroy(struct job *job)
173 job->next = free_jobs;
177 static inline void job_release(struct job *job)
180 job_unblock(job->group);
184 /** monitored call to the job */
185 static void job_call(int signum, void *arg)
187 struct job *job = arg;
188 job->callback(signum, job->arg1, job->arg2, job->arg3);
191 /** monitored cancel of the job */
192 static void job_cancel(int signum, void *arg)
194 job_call(SIGABRT, arg);
197 /* main loop of processing threads */
198 static void *thread_main_loop(void *data)
200 struct thread me, **prv;
204 me.tid = pthread_self();
206 sig_monitor_init_timeouts();
209 pthread_mutex_lock(&mutex);
213 /* loop until stopped */
218 if (!job && first_job && running == 0) {
219 /* sad situation!! should not happen */
220 ERROR("threads are blocked!");
222 first_job = job->next;
226 pthread_mutex_unlock(&mutex);
227 sig_monitor(job->timeout, job_call, job);
228 pthread_mutex_lock(&mutex);
231 /* no job, check evloop */
235 first_evloop = job->next;
236 pthread_mutex_unlock(&mutex);
237 sig_monitor(job->timeout, job_call, job);
238 pthread_mutex_lock(&mutex);
239 job->next = first_evloop;
242 /* no job and not evloop */
244 pthread_cond_wait(&cond, &mutex);
256 pthread_mutex_unlock(&mutex);
258 /* uninit and terminate */
259 sig_monitor_clean_timeouts();
263 /* start a new thread */
264 static int start_one_thread()
269 assert(started < allowed);
272 rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
276 WARNING("not able to start thread: %m");
285 void (*callback)(int, void*),
288 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
294 void (*callback)(int, void*, void*),
298 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
301 /* queue the job to the 'callback' using a separate thread if available */
305 void (*callback)(int, void*, void *, void*),
314 pthread_mutex_lock(&mutex);
316 /* allocates the job */
317 job = job_create(group, timeout, callback, arg1, arg2, arg3);
320 info = "out of memory";
324 /* start a thread if needed */
327 info = "too many jobs";
330 if (started == running && started < allowed) {
331 rc = start_one_thread();
332 if (rc < 0 && started == 0) {
333 /* failed to start threading */
334 info = "can't start first thread";
341 pthread_mutex_unlock(&mutex);
343 /* signal an existing job */
344 pthread_cond_signal(&cond);
350 ERROR("can't process job with threads: %s, %m", info);
351 pthread_mutex_unlock(&mutex);
355 /* initialise the threads */
356 int jobs_init(int allowed_count, int start_count, int waiter_count)
358 /* records the allowed count */
359 allowed = allowed_count;
362 remains = waiter_count;
364 /* start at least one thread */
365 pthread_mutex_lock(&mutex);
366 while (started < start_count && start_one_thread() == 0);
367 pthread_mutex_unlock(&mutex);
370 return -(started != start_count);
373 /* terminate all the threads and all pending requests */
374 void jobs_terminate(int wait)
383 /* request all threads to stop */
384 pthread_mutex_lock(&mutex);
387 /* search the next thread to stop */
389 while (t && pthread_equal(t->tid, me))
396 pthread_mutex_unlock(&mutex);
397 pthread_cond_broadcast(&cond);
398 pthread_join(other, NULL);
399 pthread_mutex_lock(&mutex);
402 /* cancel pending jobs */
405 first_job = job->next;
406 sig_monitor(0, job_cancel, job);
411 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
415 pthread_mutex_lock(&mutex);
416 job = job_create(key, timeout, (void (*)(int, void *, void *, void *))evloop, closure, NULL, NULL);
419 job->next = first_evloop;
422 /* signal the loop */
423 pthread_cond_signal(&cond);
425 pthread_mutex_unlock(&mutex);
437 /* request all threads to stop */
438 pthread_mutex_lock(&mutex);
441 if (pthread_equal(t->tid, me)) {
442 pthread_mutex_unlock(&mutex);
443 ERROR("thread already running");
452 pthread_mutex_unlock(&mutex);
455 thread_main_loop(NULL);
458 pthread_mutex_lock(&mutex);
460 pthread_mutex_unlock(&mutex);