Refactor job to allow synchronous calls
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "sig-monitor.h"
31 #include "verbose.h"
32
33 /* describes pending job */
34 struct job
35 {
36         struct job *next;   /* link to the next job enqueued */
37         void *group;        /* group of the request */
38         void (*callback)(int,void*,void*,void*);     /* processing callback */
39         void *arg1;         /* first arg */
40         void *arg2;         /* second arg */
41         void *arg3;         /* second arg */
42         int timeout;        /* timeout in second for processing the request */
43         int blocked;        /* is an other request blocking this one ? */
44 };
45
46 /** control of threads */
47 struct thread
48 {
49         struct thread *next;  /**< next thread of the list */
50         struct thread *upper; /**< upper same thread */
51         struct job *job;      /**< currently processed job */
52         pthread_t tid;        /**< the thread id */
53         unsigned stop: 1;     /**< stop requested */
54         unsigned lowered: 1;  /**< has a lower same thread */
55 };
56
57 /* synchronisation of threads */
58 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
59 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
60
61 /* count allowed, started and running threads */
62 static int allowed = 0; /** allowed count of threads */
63 static int started = 0; /** started count of threads */
64 static int running = 0; /** running count of threads */
65 static int remains = 0; /** remaining count of jobs that can be created */
66
67 /* list of threads */
68 static struct thread *threads;
69 static _Thread_local struct thread *current;
70
71 /* queue of pending jobs */
72 static struct job *first_job;
73 static struct job *first_evloop;
74 static struct job *free_jobs;
75
76 /**
77  * Create a new job with the given parameters
78  * @param group the group of the job
79  * @param timeout the timeout of the job (0 if none)
80  * @param callback the function that achieves the job
81  * @param arg1 the first argument of the callback
82  * @param arg2 the second argument of the callback
83  * @param arg3 the third argument of the callback
84  * @return the created job unblock or NULL when no more memory
85  */
86 static struct job *job_create(
87                 void *group,
88                 int timeout,
89                 void (*callback)(int, void*, void *, void*),
90                 void *arg1,
91                 void *arg2,
92                 void *arg3)
93 {
94         struct job *job;
95
96         /* try recyle existing job */
97         job = free_jobs;
98         if (job)
99                 free_jobs = job->next;
100         else {
101                 /* allocation  without blocking */
102                 pthread_mutex_unlock(&mutex);
103                 job = malloc(sizeof *job);
104                 pthread_mutex_lock(&mutex);
105                 if (!job) {
106                         errno = -ENOMEM;
107                         goto end;
108                 }
109         }
110         /* initialises the job */
111         job->group = group;
112         job->timeout = timeout;
113         job->callback = callback;
114         job->arg1 = arg1;
115         job->arg2 = arg2;
116         job->arg3 = arg3;
117         job->blocked = 0;
118 end:
119         return job;
120 }
121
122 /**
123  * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
124  * as blocked if an other job with the same group is pending.
125  * @param job1 the first job to add
126  * @param job2 the second job to add or NULL
127  */
128 static void job_add2(struct job *job1, struct job *job2)
129 {
130         void *group1, *group2, *group;
131         struct job *ijob, **pjob;
132
133         /* prepare to add */
134         group1 = job1->group;
135         job1->next = job2;
136         if (!job2)
137                 group2 = NULL;
138         else {
139                 job2->next = NULL;
140                 group2 = job2->group;
141                 if (group2 && group2 == group1)
142                         job2->blocked = 1;
143         }
144
145         /* search end and blackers */
146         pjob = &first_job;
147         ijob = first_job;
148         while (ijob) {
149                 group = ijob->group;
150                 if (group) {
151                         if (group == group1)
152                                 job1->blocked = 1;
153                         if (group == group2)
154                                 job2->blocked = 1;
155                 }
156                 pjob = &ijob->next;
157                 ijob = ijob->next;
158         }
159
160         /* queue the jobs */
161         *pjob = job1;
162 }
163
164 /**
165  * Get the next job to process or NULL if none.
166  * The returned job if any isn't removed from
167  * the list of jobs.
168  * @return the job to process
169  */
170 static inline struct job *job_get()
171 {
172         struct job *job;
173
174         job = first_job;
175         while (job && job->blocked)
176                 job = job->next;
177         return job;
178 }
179
180 /**
181  * Releases the processed 'job'
182  * @param job the job to release
183  */
184 static inline void job_release(struct job *job)
185 {
186         struct job *ijob, **pjob;
187         void *group;
188
189         /* first unqueue the job */
190         pjob = &first_job;
191         ijob = first_job;
192         while (ijob != job) {
193                 pjob = &ijob->next;
194                 ijob = ijob->next;
195         }
196         *pjob = job->next;
197
198         /* then unblock jobs of the same group */
199         group = job->group;
200         if (group) {
201                 ijob = job->next;
202                 while (ijob && ijob->group != group)
203                         ijob = ijob->next;
204                 if (ijob)
205                         ijob->blocked = 0;
206         }
207
208         /* recycle the job */
209         job->next = free_jobs;
210         free_jobs = job;
211 }
212
213 /** monitored call to the job */
214 static void job_call(int signum, void *arg)
215 {
216         struct job *job = arg;
217         job->callback(signum, job->arg1, job->arg2, job->arg3);
218 }
219
220 /** monitored cancel of the job */
221 static void job_cancel(int signum, void *arg)
222 {
223         job_call(SIGABRT, arg);
224 }
225
226 /* main loop of processing threads */
227 static void thread_run(struct thread *me)
228 {
229         struct thread **prv;
230         struct job *job;
231
232         /* init */
233         me->tid = pthread_self();
234         me->stop = 0;
235         me->lowered = 0;
236         me->upper = current;
237         if (current)
238                 current->lowered = 1;
239         else
240                 sig_monitor_init_timeouts();
241         current = me;
242         me->next = threads;
243         threads = me;
244
245         /* loop until stopped */
246         running++;
247         while (!me->stop) {
248                 /* get a job */
249                 job = job_get();
250                 if (!job && first_job && running == 0) {
251                         /* sad situation!! should not happen */
252                         ERROR("threads are blocked!");
253                         job = first_job;
254                         first_job = job->next;
255                 }
256                 if (job) {
257                         /* run the job */
258                         remains++;
259                         job->blocked = 1;
260                         me->job = job;
261                         pthread_mutex_unlock(&mutex);
262                         sig_monitor(job->timeout, job_call, job);
263                         pthread_mutex_lock(&mutex);
264                         job_release(job);
265                 } else {
266                         /* no job, check evloop */
267                         job = first_evloop;
268                         if (job) {
269                                 /* evloop */
270                                 first_evloop = job->next;
271                                 pthread_mutex_unlock(&mutex);
272                                 sig_monitor(job->timeout, job_call, job);
273                                 pthread_mutex_lock(&mutex);
274                                 job->next = first_evloop;
275                                 first_evloop = job;
276                         } else {
277                                 /* no job and not evloop */
278                                 running--;
279                                 pthread_cond_wait(&cond, &mutex);
280                                 running++;
281                         }
282                 }
283         }
284         running--;
285
286         /* uninit */
287         prv = &threads;
288         while (*prv != me)
289                 prv = &(*prv)->next;
290         *prv = me->next;
291         current = me->upper;
292         if (current)
293                 current->lowered = 0;
294         else
295                 sig_monitor_clean_timeouts();
296         pthread_mutex_unlock(&mutex);
297 }
298
299 /* main loop of processing threads */
300 static void *thread_create(void *data)
301 {
302         struct thread me;
303
304         pthread_mutex_lock(&mutex);
305         thread_run(&me);
306         pthread_mutex_unlock(&mutex);
307         return NULL;
308 }
309
310 /* start a new thread */
311 static int start_one_thread()
312 {
313         pthread_t tid;
314         int rc;
315
316         assert(started < allowed);
317
318         started++;
319         rc = pthread_create(&tid, NULL, thread_create, NULL);
320         if (rc != 0) {
321                 started--;
322                 errno = rc;
323                 WARNING("not able to start thread: %m");
324                 rc = -1;
325         }
326         return rc;
327 }
328
329 static int start_one_thread_if_needed()
330 {
331         int rc;
332
333         if (started == running && started < allowed) {
334                 /* all threads are busy and a new can be started */
335                 rc = start_one_thread();
336                 if (rc < 0 && started == 0)
337                         return rc; /* no thread available */
338         }
339         return 0;
340 }
341
342 int jobs_queue0(
343                 void *group,
344                 int timeout,
345                 void (*callback)(int signum))
346 {
347         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
348 }
349
350 int jobs_queue(
351                 void *group,
352                 int timeout,
353                 void (*callback)(int, void*),
354                 void *arg)
355 {
356         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
357 }
358
359 int jobs_queue2(
360                 void *group,
361                 int timeout,
362                 void (*callback)(int, void*, void*),
363                 void *arg1,
364                 void *arg2)
365 {
366         return jobs_queue3(group, timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
367 }
368
369 /* queue the job to the 'callback' using a separate thread if available */
370 int jobs_queue3(
371                 void *group,
372                 int timeout,
373                 void (*callback)(int, void*, void *, void*),
374                 void *arg1,
375                 void *arg2,
376                 void *arg3)
377 {
378         const char *info;
379         struct job *job;
380         int rc;
381
382         pthread_mutex_lock(&mutex);
383
384         /* allocates the job */
385         job = job_create(group, timeout, callback, arg1, arg2, arg3);
386         if (!job) {
387                 errno = ENOMEM;
388                 info = "out of memory";
389                 goto error;
390         }
391
392         /* check availability */
393         if (remains == 0) {
394                 errno = EBUSY;
395                 info = "too many jobs";
396                 goto error2;
397         }
398
399         /* start a thread if needed */
400         rc = start_one_thread_if_needed();
401         if (rc < 0) {
402                 /* failed to start threading */
403                 info = "can't start first thread";
404                 goto error2;
405         }
406
407         /* queues the job */
408         remains--;
409         job_add2(job, NULL);
410         pthread_mutex_unlock(&mutex);
411
412         /* signal an existing job */
413         pthread_cond_signal(&cond);
414         return 0;
415
416 error2:
417         job->next = free_jobs;
418         free_jobs = job;
419 error:
420         ERROR("can't process job with threads: %s, %m", info);
421         pthread_mutex_unlock(&mutex);
422         return -1;
423 }
424
425 /* initialise the threads */
426 int jobs_init(int allowed_count, int start_count, int waiter_count)
427 {
428         /* records the allowed count */
429         allowed = allowed_count;
430         started = 0;
431         running = 0;
432         remains = waiter_count;
433
434         /* start at least one thread */
435         pthread_mutex_lock(&mutex);
436         while (started < start_count && start_one_thread() == 0);
437         pthread_mutex_unlock(&mutex);
438
439         /* end */
440         return -(started != start_count);
441 }
442
443 int jobs_invoke0(
444                 int timeout,
445                 void (*callback)(int signum))
446 {
447         return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, NULL, NULL, NULL);
448 }
449
450 int jobs_invoke(
451                 int timeout,
452                 void (*callback)(int, void*),
453                 void *arg)
454 {
455         return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg, NULL, NULL);
456 }
457
458 int jobs_invoke2(
459                 int timeout,
460                 void (*callback)(int, void*, void*),
461                 void *arg1,
462                 void *arg2)
463 {
464         return jobs_invoke3(timeout, (void(*)(int,void*,void*,void*))callback, arg1, arg2, NULL);
465 }
466
467 static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
468 {
469         struct thread *t = arg1;
470         pthread_mutex_lock(&mutex);
471         t->stop = 1;
472         pthread_mutex_unlock(&mutex);
473 }
474
475 /* invoke the job to the 'callback' using a separate thread if available */
476 int jobs_invoke3(
477                 int timeout,
478                 void (*callback)(int, void*, void *, void*),
479                 void *arg1,
480                 void *arg2,
481                 void *arg3)
482 {
483         const char *info;
484         struct job *job1, *job2;
485         int rc;
486         struct thread me;
487         
488         pthread_mutex_lock(&mutex);
489
490         /* allocates the job */
491         job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
492         job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
493         if (!job1 || !job2) {
494                 errno = ENOMEM;
495                 info = "out of memory";
496                 goto error;
497         }
498
499         /* start a thread if needed */
500         rc = start_one_thread_if_needed();
501         if (rc < 0) {
502                 /* failed to start threading */
503                 info = "can't start first thread";
504                 goto error;
505         }
506
507         /* queues the job */
508         job_add2(job1, job2);
509
510         /* run untill stopped */
511         thread_run(&me);
512         pthread_mutex_unlock(&mutex);
513         return 0;
514
515 error:
516         if (job1) {
517                 job1->next = free_jobs;
518                 free_jobs = job1;
519         }
520         if (job2) {
521                 job2->next = free_jobs;
522                 free_jobs = job2;
523         }
524         ERROR("can't process job with threads: %s, %m", info);
525         pthread_mutex_unlock(&mutex);
526         return -1;
527 }
528
529 /* terminate all the threads and all pending requests */
530 void jobs_terminate()
531 {
532         struct job *job, *head, *tail;
533         pthread_t me, other;
534         struct thread *t;
535
536         /* how am i? */
537         me = pthread_self();
538
539         /* request all threads to stop */
540         pthread_mutex_lock(&mutex);
541         allowed = 0;
542         for(;;) {
543                 /* search the next thread to stop */
544                 t = threads;
545                 while (t && pthread_equal(t->tid, me))
546                         t = t->next;
547                 if (!t)
548                         break;
549                 /* stop it */
550                 other = t->tid;
551                 t->stop = 1;
552                 pthread_mutex_unlock(&mutex);
553                 pthread_cond_broadcast(&cond);
554                 pthread_join(other, NULL);
555                 pthread_mutex_lock(&mutex);
556         }
557
558         /* cancel pending jobs of other threads */
559         head = first_job;
560         first_job = NULL;
561         tail = NULL;
562         while (head) {
563                 /* unlink the job */
564                 job = head;
565                 head = job->next;
566
567                 /* search if job is stacked for current */
568                 t = current;
569                 while (t && t->job != job)
570                         t = t->upper;
571                 if (t) {
572                         /* yes, relink it at end */
573                         if (tail)
574                                 tail->next = job;
575                         else
576                                 first_job = job;
577                         tail = job;
578                         job->next = NULL;
579                 } else {
580                         /* no cancel the job */
581                         pthread_mutex_unlock(&mutex);
582                         sig_monitor(0, job_cancel, job);
583                         free(job);
584                         pthread_mutex_lock(&mutex);
585                 }
586         }
587         pthread_mutex_unlock(&mutex);
588 }
589
590 int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
591 {
592         struct job *job;
593
594         pthread_mutex_lock(&mutex);
595         job = job_create(key, timeout, (void (*)(int,  void *, void *, void *))evloop, closure, NULL, NULL);
596         if (job) {
597                 /* adds the loop */
598                 job->next = first_evloop;
599                 first_evloop = job;
600
601                 /* signal the loop */
602                 pthread_cond_signal(&cond);
603         }
604         pthread_mutex_unlock(&mutex);
605         return -!job;
606 }
607
608 int jobs_add_me()
609 {
610         struct thread me;
611
612         /* check whether already running */
613         if (current) {
614                 ERROR("thread already running");
615                 errno = EINVAL;
616                 return -1;
617         }
618
619         /* allowed... */
620         pthread_mutex_lock(&mutex);
621         allowed++;
622         thread_run(&me);
623         allowed--;
624         pthread_mutex_unlock(&mutex);
625         return 0;
626 }
627
628