Make main thread used for common jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "sig-monitor.h"
31 #include "verbose.h"
32
33 /** control of threads */
34 struct thread
35 {
36         struct thread *next; /**< next thread of the list */
37         pthread_t tid;     /**< the thread id */
38         unsigned stop: 1;  /**< stop request */
39 };
40
41 /* describes pending job */
42 struct job
43 {
44         struct job *next;   /* link to the next job enqueued */
45         void *group;        /* group of the request */
46         void (*callback)(int,void*,void*,void*);     /* processing callback */
47         void *arg1;         /* first arg */
48         void *arg2;         /* second arg */
49         void *arg3;         /* second arg */
50         int timeout;        /* timeout in second for processing the request */
51         int blocked;        /* is an other request blocking this one ? */
52 };
53
54 /* synchronisation of threads */
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
57
58 /* count allowed, started and running threads */
59 static int allowed = 0; /** allowed count of threads */
60 static int started = 0; /** started count of threads */
61 static int running = 0; /** running count of threads */
62 static int remains = 0; /** remaining count of jobs that can be created */
63
64 /* list of threads */
65 static struct thread *threads;
66
67 /* queue of pending jobs */
68 static struct job *first_job;
69 static struct job *first_evloop;
70 static struct job *free_jobs;
71
72 /**
73  * Adds the 'job' at the end of the list of jobs, marking it
74  * as blocked if an other job with the same group is pending.
75  * @param job the job to add
76  */
77 static inline void job_add(struct job *job)
78 {
79         void *group;
80         struct job *ijob, **pjob;
81
82         pjob = &first_job;
83         ijob = first_job;
84         group = job->group ? : (void*)(intptr_t)1;
85         while (ijob) {
86                 if (ijob->group == group)
87                         job->blocked = 1;
88                 pjob = &ijob->next;
89                 ijob = ijob->next;
90         }
91         job->next = NULL;
92         *pjob = job;
93         remains--;
94 }
95
96 /**
97  * Get the next job to process or NULL if none.
98  * The returned job if any is removed from the list of
99  * jobs.
100  * @return the job to process
101  */
102 static inline struct job *job_get()
103 {
104         struct job *job, **pjob;
105
106         pjob = &first_job;
107         job = first_job;
108         while (job && job->blocked) {
109                 pjob = &job->next;
110                 job = job->next;
111         }
112         if (job) {
113                 *pjob = job->next;
114                 remains++;
115         }
116         return job;
117 }
118
119 /**
120  * Unblock the first pending job of a group (if any)
121  * @param group the group to unblock
122  */
123 static inline void job_unblock(void *group)
124 {
125         struct job *job;
126
127         job = first_job;
128         while (job) {
129                 if (job->group == group) {
130                         job->blocked = 0;
131                         break;
132                 }
133                 job = job->next;
134         }
135 }
136
137 static struct job *job_create(
138                 void *group,
139                 int timeout,
140                 void (*callback)(int, void*, void *, void*),
141                 void *arg1,
142                 void *arg2,
143                 void *arg3)
144 {
145         struct job *job;
146
147         /* allocates the job */
148         job = free_jobs;
149         if (!job) {
150                 pthread_mutex_unlock(&mutex);
151                 job = malloc(sizeof *job);
152                 pthread_mutex_lock(&mutex);
153                 if (!job) {
154                         errno = -ENOMEM;
155                         goto end;
156                 }
157         }
158         job->group = group;
159         job->timeout = timeout;
160         job->callback = callback;
161         job->arg1 = arg1;
162         job->arg2 = arg2;
163         job->arg3 = arg3;
164         job->blocked = 0;
165 end:
166         return job;
167 }
168
169 static inline void job_destroy(struct job *job)
170 {
171         job->next = free_jobs;
172         free_jobs = job;
173 }
174
175 static inline void job_release(struct job *job)
176 {
177         if (job->group)
178                 job_unblock(job->group);
179         job_destroy(job);
180 }
181
182 /** monitored call to the job */
183 static void job_call(int signum, void *arg)
184 {
185         struct job *job = arg;
186         job->callback(signum, job->arg1, job->arg2, job->arg3);
187 }
188
189 /** monitored cancel of the job */
190 static void job_cancel(int signum, void *arg)
191 {
192         job_call(SIGABRT, arg);
193 }
194
195 /* main loop of processing threads */
196 static void *thread_main_loop(void *data)
197 {
198         struct thread me, **prv;
199         struct job *job;
200
201         /* init */
202         me.tid = pthread_self();
203         me.stop = 0;
204         sig_monitor_init_timeouts();
205
206         /* chain in */
207         pthread_mutex_lock(&mutex);
208         me.next = threads;
209         threads = &me;
210
211         /* loop until stopped */
212         running++;
213         while (!me.stop) {
214                 /* get a job */
215                 job = job_get();
216                 if (!job && first_job && running == 0) {
217                         /* sad situation!! should not happen */
218                         ERROR("threads are blocked!");
219                         job = first_job;
220                         first_job = job->next;
221                 }
222                 if (job) {
223                         /* run the job */
224                         pthread_mutex_unlock(&mutex);
225                         sig_monitor(job->timeout, job_call, job);
226                         pthread_mutex_lock(&mutex);
227                         job_release(job);
228                 } else {
229                         /* no job, check evloop */
230                         job = first_evloop;
231                         if (job) {
232                                 /* evloop */
233                                 first_evloop = job->next;
234                                 pthread_mutex_unlock(&mutex);
235                                 sig_monitor(job->timeout, job_call, job);
236                                 pthread_mutex_lock(&mutex);
237                                 job->next = first_evloop;
238                                 first_evloop = job;
239                         } else {
240                                 /* no job and not evloop */
241                                 running--;
242                                 pthread_cond_wait(&cond, &mutex);
243                                 running++;
244                         }
245                 }
246         }
247         running--;
248
249         /* chain out */
250         prv = &threads;
251         while (*prv != &me)
252                 prv = &(*prv)->next;
253         *prv = me.next;
254         pthread_mutex_unlock(&mutex);
255
256         /* uninit and terminate */
257         sig_monitor_clean_timeouts();
258         return NULL;
259 }
260
261 /* start a new thread */
262 static int start_one_thread()
263 {
264         pthread_t tid;
265         int rc;
266
267         assert(started < allowed);
268
269         started++;
270         rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
271         if (rc != 0) {
272                 started--;
273                 errno = rc;
274                 WARNING("not able to start thread: %m");
275                 rc = -1;
276         }
277         return rc;
278 }
279
280 int jobs_queue(
281                 void *group,
282                 int timeout,
283                 void (*callback)(int, void*),
284                 void *arg)
285 {
286         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
287 }
288
289 int jobs_queue2(
290                 void *group,
291                 int timeout,
292                 void (*callback)(int, void*, void*),
293                 void *arg1,
294                 void *arg2)
295 {
296         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
297 }
298
299 /* queue the job to the 'callback' using a separate thread if available */
300 int jobs_queue3(
301                 void *group,
302                 int timeout,
303                 void (*callback)(int, void*, void *, void*),
304                 void *arg1,
305                 void *arg2,
306                 void *arg3)
307 {
308         const char *info;
309         struct job *job;
310         int rc;
311
312         pthread_mutex_lock(&mutex);
313
314         /* allocates the job */
315         job = job_create(group, timeout, callback, arg1, arg2, arg3);
316         if (!job) {
317                 errno = ENOMEM;
318                 info = "out of memory";
319                 goto error;
320         }
321
322         /* start a thread if needed */
323         if (remains == 0) {
324                 errno = EBUSY;
325                 info = "too many jobs";
326                 goto error2;
327         }
328         if (started == running && started < allowed) {
329                 rc = start_one_thread();
330                 if (rc < 0 && started == 0) {
331                         /* failed to start threading */
332                         info = "can't start first thread";
333                         goto error2;
334                 }
335         }
336
337         /* queues the job */
338         job_add(job);
339         pthread_mutex_unlock(&mutex);
340
341         /* signal an existing job */
342         pthread_cond_signal(&cond);
343         return 0;
344
345 error2:
346         job_destroy(job);
347 error:
348         ERROR("can't process job with threads: %s, %m", info);
349         pthread_mutex_unlock(&mutex);
350         return -1;
351 }
352
353 /* initialise the threads */
354 int jobs_init(int allowed_count, int start_count, int waiter_count)
355 {
356         /* records the allowed count */
357         allowed = allowed_count;
358         started = 0;
359         running = 0;
360         remains = waiter_count;
361
362         /* start at least one thread */
363         pthread_mutex_lock(&mutex);
364         while (started < start_count && start_one_thread() == 0);
365         pthread_mutex_unlock(&mutex);
366
367         /* end */
368         return -(started != start_count);
369 }
370
371 /* terminate all the threads and all pending requests */
372 void jobs_terminate()
373 {
374         struct job *job;
375         pthread_t me, other;
376         struct thread *t;
377
378         /* how am i? */
379         me = pthread_self();
380
381         /* request all threads to stop */
382         pthread_mutex_lock(&mutex);
383         allowed = 0;
384         for(;;) {
385                 /* search the next thread to stop */
386                 t = threads;
387                 while (t && pthread_equal(t->tid, me))
388                         t = t->next;
389                 if (!t)
390                         break;
391                 /* stop it */
392                 other = t->tid;
393                 t->stop = 1;
394                 pthread_mutex_unlock(&mutex);
395                 pthread_cond_broadcast(&cond);
396                 pthread_join(other, NULL);
397                 pthread_mutex_lock(&mutex);
398         }
399
400         /* cancel pending jobs */
401         while (first_job) {
402                 job = first_job;
403                 first_job = job->next;
404                 sig_monitor(0, job_cancel, job);
405                 free(job);
406         }
407 }
408
409 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
410 {
411         struct job *job;
412
413         pthread_mutex_lock(&mutex);
414         job = job_create(key, timeout, (void (*)(int,  void *, void *, void *))evloop, closure, NULL, NULL);
415         if (job) {
416                 /* adds the loop */
417                 job->next = first_evloop;
418                 first_evloop = job;
419
420                 /* signal the loop */
421                 pthread_cond_signal(&cond);
422         }
423         pthread_mutex_unlock(&mutex);
424         return -!job;
425 }
426
427 int jobs_add_me()
428 {
429         pthread_t me;
430         struct thread *t;
431
432         /* how am i? */
433         me = pthread_self();
434
435         /* request all threads to stop */
436         pthread_mutex_lock(&mutex);
437         t = threads;
438         while (t) {
439                 if (pthread_equal(t->tid, me)) {
440                         pthread_mutex_unlock(&mutex);
441                         ERROR("thread already running");
442                         errno = EINVAL;
443                         return -1;
444                 }
445                 t = t->next;
446         }
447
448         /* allowed... */
449         allowed++;
450         pthread_mutex_unlock(&mutex);
451
452         /* run */
453         thread_main_loop(NULL);
454
455         /* returns */
456         pthread_mutex_lock(&mutex);
457         allowed--;
458         pthread_mutex_unlock(&mutex);
459         return 0;
460 }
461
462