afb-proto-ws: Fix autolock in proto-ws
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         const void *group;   /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg;           /**< argument */
56         int timeout;         /**< timeout in second for processing the request */
57         unsigned blocked: 1; /**< is an other request blocking this one ? */
58         unsigned dropped: 1; /**< is removed ? */
59 };
60
61 /** Description of handled event loops */
62 struct events
63 {
64         struct events *next;
65         struct sd_event *event;
66         uint64_t timeout;
67         enum {
68                 Available,
69                 Modifiable,
70                 Locked
71         } state;
72 };
73
74 /** Description of threads */
75 struct thread
76 {
77         struct thread *next;   /**< next thread of the list */
78         struct thread *upper;  /**< upper same thread */
79         struct job *job;       /**< currently processed job */
80         pthread_t tid;         /**< the thread id */
81         unsigned stop: 1;      /**< stop requested */
82         unsigned waits: 1;     /**< is waiting? */
83 };
84
85 /**
86  * Description of synchonous callback
87  */
88 struct sync
89 {
90         struct thread thread;   /**< thread loop data */
91         union {
92                 void (*callback)(int, void*);   /**< the synchronous callback */
93                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
94                                 /**< the entering synchronous routine */
95         };
96         void *arg;              /**< the argument of the callback */
97 };
98
99
100 /* synchronisation of threads */
101 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
103
104 /* count allowed, started and waiting threads */
105 static int allowed = 0; /** allowed count of threads */
106 static int started = 0; /** started count of threads */
107 static int waiting = 0; /** waiting count of threads */
108 static int remains = 0; /** allowed count of waiting jobs */
109 static int nevents = 0; /** count of events */
110
111 /* list of threads */
112 static struct thread *threads;
113 static _Thread_local struct thread *current_thread;
114 static _Thread_local struct events *current_events;
115
116 /* queue of pending jobs */
117 static struct job *first_job;
118 static struct events *first_events;
119 static struct job *free_jobs;
120
121 /**
122  * Create a new job with the given parameters
123  * @param group    the group of the job
124  * @param timeout  the timeout of the job (0 if none)
125  * @param callback the function that achieves the job
126  * @param arg      the argument of the callback
127  * @return the created job unblock or NULL when no more memory
128  */
129 static struct job *job_create(
130                 const void *group,
131                 int timeout,
132                 job_cb_t callback,
133                 void *arg)
134 {
135         struct job *job;
136
137         /* try recyle existing job */
138         job = free_jobs;
139         if (job)
140                 free_jobs = job->next;
141         else {
142                 /* allocation  without blocking */
143                 pthread_mutex_unlock(&mutex);
144                 job = malloc(sizeof *job);
145                 pthread_mutex_lock(&mutex);
146                 if (!job) {
147                         errno = -ENOMEM;
148                         goto end;
149                 }
150         }
151         /* initialises the job */
152         job->group = group;
153         job->timeout = timeout;
154         job->callback = callback;
155         job->arg = arg;
156         job->blocked = 0;
157         job->dropped = 0;
158 end:
159         return job;
160 }
161
162 /**
163  * Adds 'job' at the end of the list of jobs, marking it
164  * as blocked if an other job with the same group is pending.
165  * @param job the job to add
166  */
167 static void job_add(struct job *job)
168 {
169         const void *group;
170         struct job *ijob, **pjob;
171
172         /* prepare to add */
173         group = job->group;
174         job->next = NULL;
175
176         /* search end and blockers */
177         pjob = &first_job;
178         ijob = first_job;
179         while (ijob) {
180                 if (group && ijob->group == group)
181                         job->blocked = 1;
182                 pjob = &ijob->next;
183                 ijob = ijob->next;
184         }
185
186         /* queue the jobs */
187         *pjob = job;
188 }
189
190 /**
191  * Get the next job to process or NULL if none.
192  * @return the first job that isn't blocked or NULL
193  */
194 static inline struct job *job_get()
195 {
196         struct job *job = first_job;
197         while (job && job->blocked)
198                 job = job->next;
199         return job;
200 }
201
202 /**
203  * Get the next events to process or NULL if none.
204  * @return the first events that isn't running or NULL
205  */
206 static inline struct events *events_get()
207 {
208         struct events *events = first_events;
209         while (events && events->state != Available)
210                 events = events->next;
211         return events;
212 }
213
214 /**
215  * Releases the processed 'job': removes it
216  * from the list of jobs and unblock the first
217  * pending job of the same group if any.
218  * @param job the job to release
219  */
220 static inline void job_release(struct job *job)
221 {
222         struct job *ijob, **pjob;
223         const void *group;
224
225         /* first unqueue the job */
226         pjob = &first_job;
227         ijob = first_job;
228         while (ijob != job) {
229                 pjob = &ijob->next;
230                 ijob = ijob->next;
231         }
232         *pjob = job->next;
233
234         /* then unblock jobs of the same group */
235         group = job->group;
236         if (group) {
237                 ijob = job->next;
238                 while (ijob && ijob->group != group)
239                         ijob = ijob->next;
240                 if (ijob)
241                         ijob->blocked = 0;
242         }
243
244         /* recycle the job */
245         job->next = free_jobs;
246         free_jobs = job;
247 }
248
249 /**
250  * Monitored cancel callback for a job.
251  * This function is called by the monitor
252  * to cancel the job when the safe environment
253  * is set.
254  * @param signum 0 on normal flow or the number
255  *               of the signal that interrupted the normal
256  *               flow, isn't used
257  * @param arg    the job to run
258  */
259 static void job_cancel(int signum, void *arg)
260 {
261         struct job *job = arg;
262         job->callback(SIGABRT, job->arg);
263 }
264
265 /**
266  * Monitored normal callback for events.
267  * This function is called by the monitor
268  * to run the event loop when the safe environment
269  * is set.
270  * @param signum 0 on normal flow or the number
271  *               of the signal that interrupted the normal
272  *               flow
273  * @param arg     the events to run
274  */
275 static void events_call(int signum, void *arg)
276 {
277         int rc;
278         struct sd_event *se;
279         struct events *events = arg;
280
281         if (!signum) {
282                 se = events->event;
283                 rc = sd_event_prepare(se);
284                 if (rc < 0) {
285                         errno = -rc;
286                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
287                 } else {
288                         if (rc == 0) {
289                                 rc = sd_event_wait(se, events->timeout);
290                                 if (rc < 0) {
291                                         errno = -rc;
292                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
293                                 }
294                         }
295
296                         if (rc > 0) {
297                                 rc = sd_event_dispatch(se);
298                                 if (rc < 0) {
299                                         errno = -rc;
300                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
301                                 }
302                         }
303                 }
304         }
305 }
306
307 /**
308  * Main processing loop of threads processing jobs.
309  * The loop must be called with the mutex locked
310  * and it returns with the mutex locked.
311  * @param me the description of the thread to use
312  * TODO: how are timeout handled when reentering?
313  */
314 static void thread_run(volatile struct thread *me)
315 {
316         struct thread **prv;
317         struct job *job;
318         struct events *events;
319         uint64_t evto;
320
321         /* initialize description of itself and link it in the list */
322         me->tid = pthread_self();
323         me->stop = 0;
324         me->waits = 0;
325         me->upper = current_thread;
326         if (current_thread) {
327                 evto = EVENT_TIMEOUT_CHILD;
328         } else {
329                 started++;
330                 sig_monitor_init_timeouts();
331                 evto = EVENT_TIMEOUT_TOP;
332         }
333         me->next = threads;
334         threads = (struct thread*)me;
335         current_thread = (struct thread*)me;
336
337         /* loop until stopped */
338         while (!me->stop) {
339                 /* get a job */
340                 job = job_get(first_job);
341                 if (job) {
342                         /* prepare running the job */
343                         remains++; /* increases count of job that can wait */
344                         job->blocked = 1; /* mark job as blocked */
345                         me->job = job; /* record the job (only for terminate) */
346
347                         /* run the job */
348                         pthread_mutex_unlock(&mutex);
349                         sig_monitor(job->timeout, job->callback, job->arg);
350                         pthread_mutex_lock(&mutex);
351
352                         /* release event if any */
353                         events = current_events;
354                         if (events && events->state == Modifiable) {
355                                 current_events = NULL;
356                                 events->state = Available;
357                         }
358
359                         /* release the run job */
360                         job_release(job);
361                 } else {
362                         /* no job, check events */
363                         events = current_events;
364                         if (!events)
365                                 events = events_get();
366                         else if (events->state == Locked) {
367                                 events = 0;
368                                 AFB_WARNING("Loosing an event loop because reentering");
369                         }
370                         if (events) {
371                                 /* run the events */
372                                 events->state = Locked;
373                                 events->timeout = evto;
374                                 current_events = events;
375                                 pthread_mutex_unlock(&mutex);
376                                 sig_monitor(0, events_call, events);
377                                 pthread_mutex_lock(&mutex);
378                                 current_events = NULL;
379                                 events->state = Available;
380                         } else {
381                                 /* no job and not events */
382                                 waiting++;
383                                 me->waits = 1;
384                                 pthread_cond_wait(&cond, &mutex);
385                                 me->waits = 0;
386                                 waiting--;
387                         }
388                 }
389         }
390
391         /* unlink the current thread and cleanup */
392         prv = &threads;
393         while (*prv != me)
394                 prv = &(*prv)->next;
395         *prv = me->next;
396         current_thread = me->upper;
397         if (!current_thread) {
398                 sig_monitor_clean_timeouts();
399                 started--;
400         }
401 }
402
403 /**
404  * Entry point for created threads.
405  * @param data not used
406  * @return NULL
407  */
408 static void *thread_main(void *data)
409 {
410         struct thread me;
411
412         pthread_mutex_lock(&mutex);
413         thread_run(&me);
414         pthread_mutex_unlock(&mutex);
415         return NULL;
416 }
417
418 /**
419  * Starts a new thread
420  * @return 0 in case of success or -1 in case of error
421  */
422 static int start_one_thread()
423 {
424         pthread_t tid;
425         int rc;
426
427         rc = pthread_create(&tid, NULL, thread_main, NULL);
428         if (rc != 0) {
429                 /* errno = rc; */
430                 WARNING("not able to start thread: %m");
431                 rc = -1;
432         }
433         return rc;
434 }
435
436 /**
437  * Queues a new asynchronous job represented by 'callback' and 'arg'
438  * for the 'group' and the 'timeout'.
439  * Jobs are queued FIFO and are possibly executed in parallel
440  * concurrently except for job of the same group that are
441  * executed sequentially in FIFO order.
442  * @param group    The group of the job or NULL when no group.
443  * @param timeout  The maximum execution time in seconds of the job
444  *                 or 0 for unlimited time.
445  * @param callback The function to execute for achieving the job.
446  *                 Its first parameter is either 0 on normal flow
447  *                 or the signal number that broke the normal flow.
448  *                 The remaining parameter is the parameter 'arg1'
449  *                 given here.
450  * @param arg      The second argument for 'callback'
451  * @return 0 in case of success or -1 in case of error
452  */
453 int jobs_queue(
454                 const void *group,
455                 int timeout,
456                 void (*callback)(int, void*),
457                 void *arg)
458 {
459         const char *info;
460         struct job *job;
461         int rc;
462
463         pthread_mutex_lock(&mutex);
464
465         /* allocates the job */
466         job = job_create(group, timeout, callback, arg);
467         if (!job) {
468                 errno = ENOMEM;
469                 info = "out of memory";
470                 goto error;
471         }
472
473         /* check availability */
474         if (remains == 0) {
475                 errno = EBUSY;
476                 info = "too many jobs";
477                 goto error2;
478         }
479
480         /* start a thread if needed */
481         if (waiting == 0 && started < allowed) {
482                 /* all threads are busy and a new can be started */
483                 rc = start_one_thread();
484                 if (rc < 0 && started == 0) {
485                         info = "can't start first thread";
486                         goto error2;
487                 }
488         }
489
490         /* queues the job */
491         remains--;
492         job_add(job);
493
494         /* signal an existing job */
495         pthread_cond_signal(&cond);
496         pthread_mutex_unlock(&mutex);
497         return 0;
498
499 error2:
500         job->next = free_jobs;
501         free_jobs = job;
502 error:
503         ERROR("can't process job with threads: %s, %m", info);
504         pthread_mutex_unlock(&mutex);
505         return -1;
506 }
507
508 /**
509  * Internal helper function for 'jobs_enter'.
510  * @see jobs_enter, jobs_leave
511  */
512 static void enter_cb(int signum, void *closure)
513 {
514         struct sync *sync = closure;
515         sync->enter(signum, sync->arg, (void*)&sync->thread);
516 }
517
518 /**
519  * Internal helper function for 'jobs_call'.
520  * @see jobs_call
521  */
522 static void call_cb(int signum, void *closure)
523 {
524         struct sync *sync = closure;
525         sync->callback(signum, sync->arg);
526         jobs_leave((void*)&sync->thread);
527 }
528
529 /**
530  * Internal helper for synchronous jobs. It enters
531  * a new thread loop for evaluating the given job
532  * as recorded by the couple 'sync_cb' and 'sync'.
533  * @see jobs_call, jobs_enter, jobs_leave
534  */
535 static int do_sync(
536                 const void *group,
537                 int timeout,
538                 void (*sync_cb)(int signum, void *closure),
539                 struct sync *sync
540 )
541 {
542         struct job *job;
543
544         pthread_mutex_lock(&mutex);
545
546         /* allocates the job */
547         job = job_create(group, timeout, sync_cb, sync);
548         if (!job) {
549                 ERROR("out of memory");
550                 errno = ENOMEM;
551                 pthread_mutex_unlock(&mutex);
552                 return -1;
553         }
554
555         /* queues the job */
556         job_add(job);
557
558         /* run until stopped */
559         thread_run(&sync->thread);
560         pthread_mutex_unlock(&mutex);
561         return 0;
562 }
563
564 /**
565  * Enter a synchronisation point: activates the job given by 'callback'
566  * and 'closure' using 'group' and 'timeout' to control sequencing and
567  * execution time.
568  * @param group the group for sequencing jobs
569  * @param timeout the time in seconds allocated to the job
570  * @param callback the callback that will handle the job.
571  *                 it receives 3 parameters: 'signum' that will be 0
572  *                 on normal flow or the catched signal number in case
573  *                 of interrupted flow, the context 'closure' as given and
574  *                 a 'jobloop' reference that must be used when the job is
575  *                 terminated to unlock the current execution flow.
576  * @param arg the argument to the callback
577  * @return 0 on success or -1 in case of error
578  */
579 int jobs_enter(
580                 const void *group,
581                 int timeout,
582                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
583                 void *closure
584 )
585 {
586         struct sync sync;
587
588         sync.enter = callback;
589         sync.arg = closure;
590         return do_sync(group, timeout, enter_cb, &sync);
591 }
592
593 /**
594  * Unlocks the execution flow designed by 'jobloop'.
595  * @param jobloop indication of the flow to unlock
596  * @return 0 in case of success of -1 on error
597  */
598 int jobs_leave(struct jobloop *jobloop)
599 {
600         struct thread *t;
601
602         pthread_mutex_lock(&mutex);
603         t = threads;
604         while (t && t != (struct thread*)jobloop)
605                 t = t->next;
606         if (!t) {
607                 errno = EINVAL;
608         } else {
609                 t->stop = 1;
610                 if (t->waits)
611                         pthread_cond_broadcast(&cond);
612         }
613         pthread_mutex_unlock(&mutex);
614         return -!t;
615 }
616
617 /**
618  * Calls synchronously the job represented by 'callback' and 'arg1'
619  * for the 'group' and the 'timeout' and waits for its completion.
620  * @param group    The group of the job or NULL when no group.
621  * @param timeout  The maximum execution time in seconds of the job
622  *                 or 0 for unlimited time.
623  * @param callback The function to execute for achieving the job.
624  *                 Its first parameter is either 0 on normal flow
625  *                 or the signal number that broke the normal flow.
626  *                 The remaining parameter is the parameter 'arg1'
627  *                 given here.
628  * @param arg      The second argument for 'callback'
629  * @return 0 in case of success or -1 in case of error
630  */
631 int jobs_call(
632                 const void *group,
633                 int timeout,
634                 void (*callback)(int, void*),
635                 void *arg)
636 {
637         struct sync sync;
638
639         sync.callback = callback;
640         sync.arg = arg;
641
642         return do_sync(group, timeout, call_cb, &sync);
643 }
644
645 /**
646  * Gets a sd_event item for the current thread.
647  * @return a sd_event or NULL in case of error
648  */
649 struct sd_event *jobs_get_sd_event()
650 {
651         struct events *events;
652         int rc;
653
654         pthread_mutex_lock(&mutex);
655
656         /* search events on stack */
657         events = current_events;
658         if (!events) {
659                 /* search an available events */
660                 events = events_get();
661                 if (!events) {
662                         /* not found, check if creation possible */
663                         if (nevents >= allowed) {
664                                 ERROR("not possible to add a new event");
665                                 events = NULL;
666                         } else {
667                                 events = malloc(sizeof *events);
668                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
669                                         if (nevents < started || start_one_thread() >= 0) {
670                                                 events->state = Available;
671                                                 events->next = first_events;
672                                                 first_events = events;
673                                         } else {
674                                                 ERROR("can't start thread for events");
675                                                 sd_event_unref(events->event);
676                                                 free(events);
677                                                 events = NULL;
678                                         }
679                                 } else {
680                                         if (!events) {
681                                                 ERROR("out of memory");
682                                                 errno = ENOMEM;
683                                         } else {
684                                                 free(events);
685                                                 ERROR("creation of sd_event failed: %m");
686                                                 events = NULL;
687                                                 errno = -rc;
688                                         }
689                                 }
690                         }
691                 }
692                 if (events) {
693                         events->state = Modifiable;
694                         if (!current_thread)
695                                 WARNING("event returned for unknown thread!");
696                         current_events = events;
697                 }
698         }
699         pthread_mutex_unlock(&mutex);
700         return events ? events->event : NULL;
701 }
702
703 /**
704  * Enter the jobs processing loop.
705  * @param allowed_count Maximum count of thread for jobs including this one
706  * @param start_count   Count of thread to start now, must be lower.
707  * @param waiter_count  Maximum count of jobs that can be waiting.
708  * @param start         The start routine to activate (can't be NULL)
709  * @return 0 in case of success or -1 in case of error.
710  */
711 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
712 {
713         int rc, launched;
714         struct thread me;
715         struct job *job;
716
717         assert(allowed_count >= 1);
718         assert(start_count >= 0);
719         assert(waiter_count > 0);
720         assert(start_count <= allowed_count);
721
722         rc = -1;
723         pthread_mutex_lock(&mutex);
724
725         /* check whether already running */
726         if (current_thread || allowed) {
727                 ERROR("thread already started");
728                 errno = EINVAL;
729                 goto error;
730         }
731
732         /* start */
733         if (sig_monitor_init() < 0) {
734                 ERROR("failed to initialise signal handlers");
735                 goto error;
736         }
737
738         /* records the allowed count */
739         allowed = allowed_count;
740         started = 0;
741         waiting = 0;
742         remains = waiter_count;
743
744         /* start at least one thread */
745         launched = 0;
746         while ((launched + 1) < start_count) {
747                 if (start_one_thread() != 0) {
748                         ERROR("Not all threads can be started");
749                         goto error;
750                 }
751                 launched++;
752         }
753
754         /* queue the start job */
755         job = job_create(NULL, 0, (job_cb_t)start, NULL);
756         if (!job) {
757                 ERROR("out of memory");
758                 errno = ENOMEM;
759                 goto error;
760         }
761         job_add(job);
762         remains--;
763
764         /* run until end */
765         thread_run(&me);
766         rc = 0;
767 error:
768         pthread_mutex_unlock(&mutex);
769         return rc;
770 }
771
772 /**
773  * Terminate all the threads and cancel all pending jobs.
774  */
775 void jobs_terminate()
776 {
777         struct job *job, *head, *tail;
778         pthread_t me, *others;
779         struct thread *t;
780         int count;
781
782         /* how am i? */
783         me = pthread_self();
784
785         /* request all threads to stop */
786         pthread_mutex_lock(&mutex);
787         allowed = 0;
788
789         /* count the number of threads */
790         count = 0;
791         t = threads;
792         while (t) {
793                 if (!t->upper && !pthread_equal(t->tid, me))
794                         count++;
795                 t = t->next;
796         }
797
798         /* fill the array of threads */
799         others = alloca(count * sizeof *others);
800         count = 0;
801         t = threads;
802         while (t) {
803                 if (!t->upper && !pthread_equal(t->tid, me))
804                         others[count++] = t->tid;
805                 t = t->next;
806         }
807
808         /* stops the threads */
809         t = threads;
810         while (t) {
811                 t->stop = 1;
812                 t = t->next;
813         }
814
815         /* wait the threads */
816         pthread_cond_broadcast(&cond);
817         pthread_mutex_unlock(&mutex);
818         while (count)
819                 pthread_join(others[--count], NULL);
820         pthread_mutex_lock(&mutex);
821
822         /* cancel pending jobs of other threads */
823         remains = 0;
824         head = first_job;
825         first_job = NULL;
826         tail = NULL;
827         while (head) {
828                 /* unlink the job */
829                 job = head;
830                 head = job->next;
831
832                 /* search if job is stacked for current */
833                 t = current_thread;
834                 while (t && t->job != job)
835                         t = t->upper;
836                 if (t) {
837                         /* yes, relink it at end */
838                         if (tail)
839                                 tail->next = job;
840                         else
841                                 first_job = job;
842                         tail = job;
843                         job->next = NULL;
844                 } else {
845                         /* no cancel the job */
846                         pthread_mutex_unlock(&mutex);
847                         sig_monitor(0, job_cancel, job);
848                         free(job);
849                         pthread_mutex_lock(&mutex);
850                 }
851         }
852         pthread_mutex_unlock(&mutex);
853 }
854