2 * Copyright (C) 2016, 2017 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include <sys/syscall.h>
30 #include "sig-monitor.h"
33 /* describes pending job */
36 struct job *next; /* link to the next job enqueued */
37 void *group; /* group of the request */
38 void (*callback)(int,void*,void*,void*); /* processing callback */
39 void *arg1; /* first arg */
40 void *arg2; /* second arg */
41 void *arg3; /* second arg */
42 int timeout; /* timeout in second for processing the request */
43 int blocked; /* is an other request blocking this one ? */
46 /** control of threads */
49 struct thread *next; /**< next thread of the list */
50 struct thread *upper; /**< upper same thread */
51 struct job *job; /**< currently processed job */
52 pthread_t tid; /**< the thread id */
53 unsigned stop: 1; /**< stop requested */
54 unsigned lowered: 1; /**< has a lower same thread */
57 /* synchronisation of threads */
58 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
61 /* count allowed, started and running threads */
62 static int allowed = 0; /** allowed count of threads */
63 static int started = 0; /** started count of threads */
64 static int running = 0; /** running count of threads */
65 static int remains = 0; /** remaining count of jobs that can be created */
68 static struct thread *threads;
69 static _Thread_local struct thread *current;
71 /* queue of pending jobs */
72 static struct job *first_job;
73 static struct job *first_evloop;
74 static struct job *free_jobs;
77 * Create a new job with the given parameters
78 * @param group the group of the job
79 * @param timeout the timeout of the job (0 if none)
80 * @param callback the function that achieves the job
81 * @param arg1 the first argument of the callback
82 * @param arg2 the second argument of the callback
83 * @param arg3 the third argument of the callback
84 * @return the created job unblock or NULL when no more memory
86 static struct job *job_create(
89 void (*callback)(int, void*, void *, void*),
96 /* try recyle existing job */
99 free_jobs = job->next;
101 /* allocation without blocking */
102 pthread_mutex_unlock(&mutex);
103 job = malloc(sizeof *job);
104 pthread_mutex_lock(&mutex);
110 /* initialises the job */
112 job->timeout = timeout;
113 job->callback = callback;
123 * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
124 * as blocked if an other job with the same group is pending.
125 * @param job1 the first job to add
126 * @param job2 the second job to add or NULL
128 static void job_add2(struct job *job1, struct job *job2)
130 void *group1, *group2, *group;
131 struct job *ijob, **pjob;
134 group1 = job1->group;
140 group2 = job2->group;
141 if (group2 && group2 == group1)
145 /* search end and blackers */
165 * Get the next job to process or NULL if none.
166 * The returned job if any isn't removed from
168 * @return the job to process
170 static inline struct job *job_get()
175 while (job && job->blocked)
181 * Releases the processed 'job'
182 * @param job the job to release
184 static inline void job_release(struct job *job)
186 struct job *ijob, **pjob;
189 /* first unqueue the job */
192 while (ijob != job) {
198 /* then unblock jobs of the same group */
202 while (ijob && ijob->group != group)
208 /* recycle the job */
209 job->next = free_jobs;
213 /** monitored call to the job */
214 static void job_call(int signum, void *arg)
216 struct job *job = arg;
217 job->callback(signum, job->arg1, job->arg2, job->arg3);
220 /** monitored cancel of the job */
221 static void job_cancel(int signum, void *arg)
223 job_call(SIGABRT, arg);
226 /* main loop of processing threads */
227 static void thread_run(struct thread *me)
233 me->tid = pthread_self();
238 current->lowered = 1;
240 sig_monitor_init_timeouts();
245 /* loop until stopped */
250 if (!job && first_job && running == 0) {
251 /* sad situation!! should not happen */
252 ERROR("threads are blocked!");
254 first_job = job->next;
261 pthread_mutex_unlock(&mutex);
262 sig_monitor(job->timeout, job_call, job);
263 pthread_mutex_lock(&mutex);
266 /* no job, check evloop */
270 first_evloop = job->next;
271 pthread_mutex_unlock(&mutex);
272 sig_monitor(job->timeout, job_call, job);
273 pthread_mutex_lock(&mutex);
274 job->next = first_evloop;
277 /* no job and not evloop */
279 pthread_cond_wait(&cond, &mutex);
293 current->lowered = 0;
295 sig_monitor_clean_timeouts();
296 pthread_mutex_unlock(&mutex);
299 /* main loop of processing threads */
300 static void *thread_create(void *data)
304 pthread_mutex_lock(&mutex);
306 pthread_mutex_unlock(&mutex);
310 /* start a new thread */
311 static int start_one_thread()
316 assert(started < allowed);
319 rc = pthread_create(&tid, NULL, thread_create, NULL);
323 WARNING("not able to start thread: %m");
329 static int start_one_thread_if_needed()
333 if (started == running && started < allowed) {
334 /* all threads are busy and a new can be started */
335 rc = start_one_thread();
336 if (rc < 0 && started == 0)
337 return rc; /* no thread available */
345 void (*callback)(int signum))
347 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
353 void (*callback)(int, void*),
356 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
362 void (*callback)(int, void*, void*),
366 return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
369 /* queue the job to the 'callback' using a separate thread if available */
373 void (*callback)(int, void*, void *, void*),
382 pthread_mutex_lock(&mutex);
384 /* allocates the job */
385 job = job_create(group, timeout, callback, arg1, arg2, arg3);
388 info = "out of memory";
392 /* check availability */
395 info = "too many jobs";
399 /* start a thread if needed */
400 rc = start_one_thread_if_needed();
402 /* failed to start threading */
403 info = "can't start first thread";
410 pthread_mutex_unlock(&mutex);
412 /* signal an existing job */
413 pthread_cond_signal(&cond);
417 job->next = free_jobs;
420 ERROR("can't process job with threads: %s, %m", info);
421 pthread_mutex_unlock(&mutex);
425 /* initialise the threads */
426 int jobs_init(int allowed_count, int start_count, int waiter_count)
428 /* records the allowed count */
429 allowed = allowed_count;
432 remains = waiter_count;
434 /* start at least one thread */
435 pthread_mutex_lock(&mutex);
436 while (started < start_count && start_one_thread() == 0);
437 pthread_mutex_unlock(&mutex);
440 return -(started != start_count);
445 void (*callback)(int signum))
447 return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
452 void (*callback)(int, void*),
455 return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
460 void (*callback)(int, void*, void*),
464 return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
467 static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
469 struct thread *t = arg1;
470 pthread_mutex_lock(&mutex);
472 pthread_mutex_unlock(&mutex);
475 /* invoke the job to the 'callback' using a separate thread if available */
478 void (*callback)(int, void*, void *, void*),
484 struct job *job1, *job2;
488 pthread_mutex_lock(&mutex);
490 /* allocates the job */
491 job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
492 job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
493 if (!job1 || !job2) {
495 info = "out of memory";
499 /* start a thread if needed */
500 rc = start_one_thread_if_needed();
502 /* failed to start threading */
503 info = "can't start first thread";
508 job_add2(job1, job2);
510 /* run untill stopped */
512 pthread_mutex_unlock(&mutex);
517 job1->next = free_jobs;
521 job2->next = free_jobs;
524 ERROR("can't process job with threads: %s, %m", info);
525 pthread_mutex_unlock(&mutex);
529 /* terminate all the threads and all pending requests */
530 void jobs_terminate()
532 struct job *job, *head, *tail;
539 /* request all threads to stop */
540 pthread_mutex_lock(&mutex);
543 /* search the next thread to stop */
545 while (t && pthread_equal(t->tid, me))
552 pthread_mutex_unlock(&mutex);
553 pthread_cond_broadcast(&cond);
554 pthread_join(other, NULL);
555 pthread_mutex_lock(&mutex);
558 /* cancel pending jobs of other threads */
567 /* search if job is stacked for current */
569 while (t && t->job != job)
572 /* yes, relink it at end */
580 /* no cancel the job */
581 pthread_mutex_unlock(&mutex);
582 sig_monitor(0, job_cancel, job);
584 pthread_mutex_lock(&mutex);
587 pthread_mutex_unlock(&mutex);
590 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
594 pthread_mutex_lock(&mutex);
595 job = job_create(key, timeout, (void (*)(int, void *, void *, void *))evloop, closure, NULL, NULL);
598 job->next = first_evloop;
601 /* signal the loop */
602 pthread_cond_signal(&cond);
604 pthread_mutex_unlock(&mutex);
612 /* check whether already running */
614 ERROR("thread already running");
620 pthread_mutex_lock(&mutex);
624 pthread_mutex_unlock(&mutex);