Refactor of threading and signal monitor
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "sig-monitor.h"
31 #include "verbose.h"
32
33 /* control of threads */
34 struct thread
35 {
36         pthread_t tid;     /* the thread id */
37         unsigned stop: 1;  /* stop request */
38         unsigned ended: 1; /* ended status */
39         unsigned works: 1; /* is it processing a job? */
40 };
41
42 /* describes pending job */
43 struct job
44 {
45         struct job *next;   /* link to the next job enqueued */
46         void *group;        /* group of the request */
47         void (*callback)(int,void*,void*,void*);     /* processing callback */
48         void *arg1;         /* first arg */
49         void *arg2;         /* second arg */
50         void *arg3;         /* second arg */
51         int timeout;        /* timeout in second for processing the request */
52         int blocked;        /* is an other request blocking this one ? */
53 };
54
55 /* synchronisation of threads */
56 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
57 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
58
59 /* queue of pending jobs */
60 static struct job *first_job = NULL;
61
62 /* count allowed, started and running threads */
63 static int allowed = 0;
64 static int started = 0;
65 static int running = 0;
66 static int remains = 0;
67
68 /* list of threads */
69 static struct thread *threads = NULL;
70
71 /* add the job to the list */
72 static inline void job_add(struct job *job)
73 {
74         void *group = job->group;
75         struct job *ijob, **pjob;
76
77         pjob = &first_job;
78         ijob = first_job;
79         group = job->group ? : job;
80         while (ijob) {
81                 if (ijob->group == group)
82                         job->blocked = 1;
83                 pjob = &ijob->next;
84                 ijob = ijob->next;
85         }
86         *pjob = job;
87         job->next = NULL;
88         remains--;
89 }
90
91 /* get the next job to process or NULL if none */
92 static inline struct job *job_get()
93 {
94         struct job *job, **pjob;
95         pjob = &first_job;
96         job = first_job;
97         while (job && job->blocked) {
98                 pjob = &job->next;
99                 job = job->next;
100         }
101         if (job) {
102                 *pjob = job->next;
103                 remains++;
104         }
105         return job;
106 }
107
108 /* unblock a group of job */
109 static inline void job_unblock(void *group)
110 {
111         struct job *job;
112
113         job = first_job;
114         while (job) {
115                 if (job->group == group) {
116                         job->blocked = 0;
117                         break;
118                 }
119                 job = job->next;
120         }
121 }
122
123 /* call the job */
124 static inline void job_call(int signum, void *arg)
125 {
126         struct job *job = arg;
127         job->callback(signum, job->arg1, job->arg2, job->arg3);
128 }
129
130 /* cancel the job */
131 static inline void job_cancel(int signum, void *arg)
132 {
133         struct job *job = arg;
134         job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
135 }
136
137 /* main loop of processing threads */
138 static void *thread_main_loop(void *data)
139 {
140         struct thread *me = data;
141         struct job *job;
142
143         me->works = 0;
144         me->ended = 0;
145         sig_monitor_init_timeouts();
146         pthread_mutex_lock(&mutex);
147         while (!me->stop) {
148                 /* get a job */
149                 job = job_get();
150                 if (job == NULL && first_job != NULL && running == 0) {
151                         /* sad situation!! should not happen */
152                         ERROR("threads are blocked!");
153                         job = first_job;
154                         first_job = job->next;
155                 }
156                 if (job == NULL) {
157                         /* no job... */
158                         pthread_cond_wait(&cond, &mutex);
159                 } else {
160                         /* run the job */
161                         running++;
162                         me->works = 1;
163                         pthread_mutex_unlock(&mutex);
164                         sig_monitor(job->timeout, job_call, job);
165                         pthread_mutex_lock(&mutex);
166                         me->works = 0;
167                         running--;
168                         if (job->group != NULL)
169                                 job_unblock(job->group);
170                         free(job);
171                 }
172
173         }
174         me->ended = 1;
175         pthread_mutex_unlock(&mutex);
176         sig_monitor_clean_timeouts();
177         return me;
178 }
179
180 /* start a new thread */
181 static int start_one_thread()
182 {
183         struct thread *t;
184         int rc;
185
186         assert(started < allowed);
187
188         t = &threads[started++];
189         t->stop = 0;
190         rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
191         if (rc != 0) {
192                 started--;
193                 errno = rc;
194                 WARNING("not able to start thread: %m");
195                 rc = -1;
196         }
197         return rc;
198 }
199
200 int jobs_queue(
201                 void *group,
202                 int timeout,
203                 void (*callback)(int, void*),
204                 void *arg)
205 {
206         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
207 }
208
209 int jobs_queue2(
210                 void *group,
211                 int timeout,
212                 void (*callback)(int, void*, void*),
213                 void *arg1,
214                 void *arg2)
215 {
216         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
217 }
218
219 /* queue the job to the 'callback' using a separate thread if available */
220 int jobs_queue3(
221                 void *group,
222                 int timeout,
223                 void (*callback)(int, void*, void *, void*),
224                 void *arg1,
225                 void *arg2,
226                 void *arg3)
227 {
228         const char *info;
229         struct job *job;
230         int rc;
231
232         /* allocates the job */
233         job = malloc(sizeof *job);
234         if (job == NULL) {
235                 errno = ENOMEM;
236                 info = "out of memory";
237                 goto error;
238         }
239
240         /* start a thread if needed */
241         pthread_mutex_lock(&mutex);
242         if (remains == 0) {
243                 errno = EBUSY;
244                 info = "too many jobs";
245                 goto error2;
246         }
247         if (started == running && started < allowed) {
248                 rc = start_one_thread();
249                 if (rc < 0 && started == 0) {
250                         /* failed to start threading */
251                         info = "can't start first thread";
252                         goto error2;
253                 }
254         }
255
256         /* fills and queues the job */
257         job->group = group;
258         job->timeout = timeout;
259         job->callback = callback;
260         job->arg1 = arg1;
261         job->arg2 = arg2;
262         job->arg3 = arg3;
263         job->blocked = 0;
264         job_add(job);
265         pthread_mutex_unlock(&mutex);
266
267         /* signal an existing job */
268         pthread_cond_signal(&cond);
269         return 0;
270
271 error2:
272         pthread_mutex_unlock(&mutex);
273         free(job);
274 error:
275         ERROR("can't process job with threads: %s, %m", info);
276         return -1;
277 }
278
279 /* initialise the threads */
280 int jobs_init(int allowed_count, int start_count, int waiter_count)
281 {
282         threads = calloc(allowed_count, sizeof *threads);
283         if (threads == NULL) {
284                 errno = ENOMEM;
285                 ERROR("can't allocate threads");
286                 return -1;
287         }
288
289         /* records the allowed count */
290         allowed = allowed_count;
291         started = 0;
292         running = 0;
293         remains = waiter_count;
294
295         /* start at least one thread */
296         pthread_mutex_lock(&mutex);
297         while (started < start_count && start_one_thread() == 0);
298         pthread_mutex_unlock(&mutex);
299
300         /* end */
301         return -(started != start_count);
302 }
303
304 /* terminate all the threads and all pending requests */
305 void jobs_terminate()
306 {
307         int i, n;
308         struct job *job;
309
310         /* request all threads to stop */
311         pthread_mutex_lock(&mutex);
312         allowed = 0;
313         n = started;
314         for (i = 0 ; i < n ; i++)
315                 threads[i].stop = 1;
316
317         /* wait until all thread are terminated */
318         while (started != 0) {
319                 /* signal threads */
320                 pthread_mutex_unlock(&mutex);
321                 pthread_cond_broadcast(&cond);
322                 pthread_mutex_lock(&mutex);
323
324                 /* join the terminated threads */
325                 for (i = 0 ; i < n ; i++) {
326                         if (threads[i].tid && threads[i].ended) {
327                                 pthread_join(threads[i].tid, NULL);
328                                 threads[i].tid = 0;
329                                 started--;
330                         }
331                 }
332         }
333         pthread_mutex_unlock(&mutex);
334         free(threads);
335
336         /* cancel pending jobs */
337         while (first_job) {
338                 job = first_job;
339                 first_job = job->next;
340                 sig_monitor(0, job_cancel, job);
341                 free(job);
342         }
343 }
344