Fix bug in recycling jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "sig-monitor.h"
31 #include "verbose.h"
32
33 /** control of threads */
34 struct thread
35 {
36         struct thread *next; /**< next thread of the list */
37         pthread_t tid;     /**< the thread id */
38         unsigned stop: 1;  /**< stop request */
39 };
40
41 /* describes pending job */
42 struct job
43 {
44         struct job *next;   /* link to the next job enqueued */
45         void *group;        /* group of the request */
46         void (*callback)(int,void*,void*,void*);     /* processing callback */
47         void *arg1;         /* first arg */
48         void *arg2;         /* second arg */
49         void *arg3;         /* second arg */
50         int timeout;        /* timeout in second for processing the request */
51         int blocked;        /* is an other request blocking this one ? */
52 };
53
54 /* synchronisation of threads */
55 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
56 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
57
58 /* count allowed, started and running threads */
59 static int allowed = 0; /** allowed count of threads */
60 static int started = 0; /** started count of threads */
61 static int running = 0; /** running count of threads */
62 static int remains = 0; /** remaining count of jobs that can be created */
63
64 /* list of threads */
65 static struct thread *threads;
66
67 /* queue of pending jobs */
68 static struct job *first_job;
69 static struct job *first_evloop;
70 static struct job *free_jobs;
71
72 /**
73  * Adds the 'job' at the end of the list of jobs, marking it
74  * as blocked if an other job with the same group is pending.
75  * @param job the job to add
76  */
77 static inline void job_add(struct job *job)
78 {
79         void *group;
80         struct job *ijob, **pjob;
81
82         pjob = &first_job;
83         ijob = first_job;
84         group = job->group ? : (void*)(intptr_t)1;
85         while (ijob) {
86                 if (ijob->group == group)
87                         job->blocked = 1;
88                 pjob = &ijob->next;
89                 ijob = ijob->next;
90         }
91         job->next = NULL;
92         *pjob = job;
93         remains--;
94 }
95
96 /**
97  * Get the next job to process or NULL if none.
98  * The returned job if any is removed from the list of
99  * jobs.
100  * @return the job to process
101  */
102 static inline struct job *job_get()
103 {
104         struct job *job, **pjob;
105
106         pjob = &first_job;
107         job = first_job;
108         while (job && job->blocked) {
109                 pjob = &job->next;
110                 job = job->next;
111         }
112         if (job) {
113                 *pjob = job->next;
114                 remains++;
115         }
116         return job;
117 }
118
119 /**
120  * Unblock the first pending job of a group (if any)
121  * @param group the group to unblock
122  */
123 static inline void job_unblock(void *group)
124 {
125         struct job *job;
126
127         job = first_job;
128         while (job) {
129                 if (job->group == group) {
130                         job->blocked = 0;
131                         break;
132                 }
133                 job = job->next;
134         }
135 }
136
137 static struct job *job_create(
138                 void *group,
139                 int timeout,
140                 void (*callback)(int, void*, void *, void*),
141                 void *arg1,
142                 void *arg2,
143                 void *arg3)
144 {
145         struct job *job;
146
147         /* allocates the job */
148         job = free_jobs;
149         if (job)
150                 free_jobs = job->next;
151         else {
152                 pthread_mutex_unlock(&mutex);
153                 job = malloc(sizeof *job);
154                 pthread_mutex_lock(&mutex);
155                 if (!job) {
156                         errno = -ENOMEM;
157                         goto end;
158                 }
159         }
160         job->group = group;
161         job->timeout = timeout;
162         job->callback = callback;
163         job->arg1 = arg1;
164         job->arg2 = arg2;
165         job->arg3 = arg3;
166         job->blocked = 0;
167 end:
168         return job;
169 }
170
171 static inline void job_destroy(struct job *job)
172 {
173         job->next = free_jobs;
174         free_jobs = job;
175 }
176
177 static inline void job_release(struct job *job)
178 {
179         if (job->group)
180                 job_unblock(job->group);
181         job_destroy(job);
182 }
183
184 /** monitored call to the job */
185 static void job_call(int signum, void *arg)
186 {
187         struct job *job = arg;
188         job->callback(signum, job->arg1, job->arg2, job->arg3);
189 }
190
191 /** monitored cancel of the job */
192 static void job_cancel(int signum, void *arg)
193 {
194         job_call(SIGABRT, arg);
195 }
196
197 /* main loop of processing threads */
198 static void *thread_main_loop(void *data)
199 {
200         struct thread me, **prv;
201         struct job *job;
202
203         /* init */
204         me.tid = pthread_self();
205         me.stop = 0;
206         sig_monitor_init_timeouts();
207
208         /* chain in */
209         pthread_mutex_lock(&mutex);
210         me.next = threads;
211         threads = &me;
212
213         /* loop until stopped */
214         running++;
215         while (!me.stop) {
216                 /* get a job */
217                 job = job_get();
218                 if (!job && first_job && running == 0) {
219                         /* sad situation!! should not happen */
220                         ERROR("threads are blocked!");
221                         job = first_job;
222                         first_job = job->next;
223                 }
224                 if (job) {
225                         /* run the job */
226                         pthread_mutex_unlock(&mutex);
227                         sig_monitor(job->timeout, job_call, job);
228                         pthread_mutex_lock(&mutex);
229                         job_release(job);
230                 } else {
231                         /* no job, check evloop */
232                         job = first_evloop;
233                         if (job) {
234                                 /* evloop */
235                                 first_evloop = job->next;
236                                 pthread_mutex_unlock(&mutex);
237                                 sig_monitor(job->timeout, job_call, job);
238                                 pthread_mutex_lock(&mutex);
239                                 job->next = first_evloop;
240                                 first_evloop = job;
241                         } else {
242                                 /* no job and not evloop */
243                                 running--;
244                                 pthread_cond_wait(&cond, &mutex);
245                                 running++;
246                         }
247                 }
248         }
249         running--;
250
251         /* chain out */
252         prv = &threads;
253         while (*prv != &me)
254                 prv = &(*prv)->next;
255         *prv = me.next;
256         pthread_mutex_unlock(&mutex);
257
258         /* uninit and terminate */
259         sig_monitor_clean_timeouts();
260         return NULL;
261 }
262
263 /* start a new thread */
264 static int start_one_thread()
265 {
266         pthread_t tid;
267         int rc;
268
269         assert(started < allowed);
270
271         started++;
272         rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
273         if (rc != 0) {
274                 started--;
275                 errno = rc;
276                 WARNING("not able to start thread: %m");
277                 rc = -1;
278         }
279         return rc;
280 }
281
282 int jobs_queue(
283                 void *group,
284                 int timeout,
285                 void (*callback)(int, void*),
286                 void *arg)
287 {
288         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
289 }
290
291 int jobs_queue2(
292                 void *group,
293                 int timeout,
294                 void (*callback)(int, void*, void*),
295                 void *arg1,
296                 void *arg2)
297 {
298         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
299 }
300
301 /* queue the job to the 'callback' using a separate thread if available */
302 int jobs_queue3(
303                 void *group,
304                 int timeout,
305                 void (*callback)(int, void*, void *, void*),
306                 void *arg1,
307                 void *arg2,
308                 void *arg3)
309 {
310         const char *info;
311         struct job *job;
312         int rc;
313
314         pthread_mutex_lock(&mutex);
315
316         /* allocates the job */
317         job = job_create(group, timeout, callback, arg1, arg2, arg3);
318         if (!job) {
319                 errno = ENOMEM;
320                 info = "out of memory";
321                 goto error;
322         }
323
324         /* start a thread if needed */
325         if (remains == 0) {
326                 errno = EBUSY;
327                 info = "too many jobs";
328                 goto error2;
329         }
330         if (started == running && started < allowed) {
331                 rc = start_one_thread();
332                 if (rc < 0 && started == 0) {
333                         /* failed to start threading */
334                         info = "can't start first thread";
335                         goto error2;
336                 }
337         }
338
339         /* queues the job */
340         job_add(job);
341         pthread_mutex_unlock(&mutex);
342
343         /* signal an existing job */
344         pthread_cond_signal(&cond);
345         return 0;
346
347 error2:
348         job_destroy(job);
349 error:
350         ERROR("can't process job with threads: %s, %m", info);
351         pthread_mutex_unlock(&mutex);
352         return -1;
353 }
354
355 /* initialise the threads */
356 int jobs_init(int allowed_count, int start_count, int waiter_count)
357 {
358         /* records the allowed count */
359         allowed = allowed_count;
360         started = 0;
361         running = 0;
362         remains = waiter_count;
363
364         /* start at least one thread */
365         pthread_mutex_lock(&mutex);
366         while (started < start_count && start_one_thread() == 0);
367         pthread_mutex_unlock(&mutex);
368
369         /* end */
370         return -(started != start_count);
371 }
372
373 /* terminate all the threads and all pending requests */
374 void jobs_terminate(int wait)
375 {
376         struct job *job;
377         pthread_t me, other;
378         struct thread *t;
379
380         /* how am i? */
381         me = pthread_self();
382
383         /* request all threads to stop */
384         pthread_mutex_lock(&mutex);
385         allowed = 0;
386         for(;;) {
387                 /* search the next thread to stop */
388                 t = threads;
389                 while (t && pthread_equal(t->tid, me))
390                         t = t->next;
391                 if (!t)
392                         break;
393                 /* stop it */
394                 other = t->tid;
395                 t->stop = 1;
396                 pthread_mutex_unlock(&mutex);
397                 pthread_cond_broadcast(&cond);
398                 pthread_join(other, NULL);
399                 pthread_mutex_lock(&mutex);
400         }
401
402         /* cancel pending jobs */
403         while (first_job) {
404                 job = first_job;
405                 first_job = job->next;
406                 sig_monitor(0, job_cancel, job);
407                 free(job);
408         }
409 }
410
411 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
412 {
413         struct job *job;
414
415         pthread_mutex_lock(&mutex);
416         job = job_create(key, timeout, (void (*)(int,  void *, void *, void *))evloop, closure, NULL, NULL);
417         if (job) {
418                 /* adds the loop */
419                 job->next = first_evloop;
420                 first_evloop = job;
421
422                 /* signal the loop */
423                 pthread_cond_signal(&cond);
424         }
425         pthread_mutex_unlock(&mutex);
426         return -!job;
427 }
428
429 int jobs_add_me()
430 {
431         pthread_t me;
432         struct thread *t;
433
434         /* how am i? */
435         me = pthread_self();
436
437         /* request all threads to stop */
438         pthread_mutex_lock(&mutex);
439         t = threads;
440         while (t) {
441                 if (pthread_equal(t->tid, me)) {
442                         pthread_mutex_unlock(&mutex);
443                         ERROR("thread already running");
444                         errno = EINVAL;
445                         return -1;
446                 }
447                 t = t->next;
448         }
449
450         /* allowed... */
451         allowed++;
452         pthread_mutex_unlock(&mutex);
453
454         /* run */
455         thread_main_loop(NULL);
456
457         /* returns */
458         pthread_mutex_lock(&mutex);
459         allowed--;
460         pthread_mutex_unlock(&mutex);
461         return 0;
462 }
463
464