Adds 2017 to copyrights
[src/app-framework-binder.git] / src / afb-thread.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include <afb/afb-req-itf.h>
30
31 #include "afb-thread.h"
32 #include "afb-sig-handler.h"
33 #include "verbose.h"
34
35 /* control of threads */
36 struct thread
37 {
38         pthread_t tid;     /* the thread id */
39         unsigned stop: 1;  /* stop request */
40         unsigned ended: 1; /* ended status */
41         unsigned works: 1; /* is it processing a job? */
42 };
43
44 /* describes pending job */
45 struct job
46 {
47         void (*callback)(struct afb_req req); /* processing callback */
48         struct afb_req req; /* request to be processed */
49         int timeout;        /* timeout in second for processing the request */
50         int blocked;        /* is an other request blocking this one ? */
51         void *group;        /* group of the request */
52         struct job *next;   /* link to the next job enqueued */
53 };
54
55 /* synchronisation of threads */
56 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
57 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
58
59 /* queue of pending jobs */
60 static struct job *first_job = NULL;
61
62 /* count allowed, started and running threads */
63 static int allowed = 0;
64 static int started = 0;
65 static int running = 0;
66 static int remains = 0;
67
68 /* list of threads */
69 static struct thread *threads = NULL;
70
71 /* local timers */
72 static _Thread_local int thread_timer_set;
73 static _Thread_local timer_t thread_timerid;
74
75 /*
76  * Creates a timer for the current thread
77  *
78  * Returns 0 in case of success
79  */
80 int afb_thread_timer_create()
81 {
82         int rc;
83         struct sigevent sevp;
84
85         if (thread_timer_set)
86                 rc = 0;
87         else {
88                 sevp.sigev_notify = SIGEV_THREAD_ID;
89                 sevp.sigev_signo = SIGALRM;
90                 sevp.sigev_value.sival_ptr = NULL;
91 #if defined(sigev_notify_thread_id)
92                 sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
93 #else
94                 sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
95 #endif
96                 rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
97                 thread_timer_set = !rc;
98         }
99         return 0;
100 }
101
102 /*
103  * Arms the alarm in timeout seconds for the current thread
104  */
105 int afb_thread_timer_arm(int timeout)
106 {
107         int rc;
108         struct itimerspec its;
109
110         rc = afb_thread_timer_create();
111         if (rc == 0) {
112                 its.it_interval.tv_sec = 0;
113                 its.it_interval.tv_nsec = 0;
114                 its.it_value.tv_sec = timeout;
115                 its.it_value.tv_nsec = 0;
116                 rc = timer_settime(thread_timerid, 0, &its, NULL);
117         }
118
119         return rc;
120 }
121
122 /*
123  * Disarms the current alarm
124  */
125 void afb_thread_timer_disarm()
126 {
127         if (thread_timer_set)
128                 afb_thread_timer_arm(0);
129 }
130
131 /*
132  * Delstroy any alarm resource for the current thread
133  */
134 void afb_thread_timer_delete()
135 {
136         if (thread_timer_set) {
137                 timer_delete(thread_timerid);
138                 thread_timer_set = 0;
139         }
140 }
141
142 /* add the job to the list */
143 static inline void job_add(struct job *job)
144 {
145         void *group = job->group;
146         struct job *ijob, **pjob;
147
148         pjob = &first_job;
149         ijob = first_job;
150         group = job->group;
151         if (group == NULL)
152                 group = job;
153         while (ijob) {
154                 if (ijob->group == group)
155                         job->blocked = 1;
156                 pjob = &ijob->next;
157                 ijob = ijob->next;
158         }
159         *pjob = job;
160         job->next = NULL;
161         remains--;
162 }
163
164 /* get the next job to process or NULL if none */
165 static inline struct job *job_get()
166 {
167         struct job *job, **pjob;
168         pjob = &first_job;
169         job = first_job;
170         while (job && job->blocked) {
171                 pjob = &job->next;
172                 job = job->next;
173         }
174         if (job) {
175                 *pjob = job->next;
176                 remains++;
177         }
178         return job;
179 }
180
181 /* unblock a group of job */
182 static inline void job_unblock(void *group)
183 {
184         struct job *job;
185
186         job = first_job;
187         while (job) {
188                 if (job->group == group) {
189                         job->blocked = 0;
190                         break;
191                 }
192                 job = job->next;
193         }
194 }
195
196 /* main loop of processing threads */
197 static void *thread_main_loop(void *data)
198 {
199         struct thread *me = data;
200         struct job *job, j;
201
202         me->works = 0;
203         me->ended = 0;
204         afb_thread_timer_create();
205         pthread_mutex_lock(&mutex);
206         while (!me->stop) {
207                 /* get a job */
208                 job = job_get();
209                 if (job == NULL && first_job != NULL && running == 0) {
210                         /* sad situation!! should not happen */
211                         ERROR("threads are blocked!");
212                         job = first_job;
213                         first_job = job->next;
214                 }
215                 if (job == NULL) {
216                         /* no job... */
217                         pthread_cond_wait(&cond, &mutex);
218                 } else {
219                         /* run the job */
220                         running++;
221                         me->works = 1;
222                         pthread_mutex_unlock(&mutex);
223                         j = *job;
224                         free(job);
225                         afb_thread_timer_arm(j.timeout);
226                         afb_sig_req(j.req, j.callback);
227                         afb_thread_timer_disarm();
228                         afb_req_unref(j.req);
229                         pthread_mutex_lock(&mutex);
230                         if (j.group != NULL)
231                                 job_unblock(j.group);
232                         me->works = 0;
233                         running--;
234                 }
235
236         }
237         me->ended = 1;
238         pthread_mutex_unlock(&mutex);
239         afb_thread_timer_delete();
240         return me;
241 }
242
243 /* start a new thread */
244 static int start_one_thread()
245 {
246         struct thread *t;
247         int rc;
248
249         assert(started < allowed);
250
251         t = &threads[started++];
252         t->stop = 0;
253         rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
254         if (rc != 0) {
255                 started--;
256                 errno = rc;
257                 WARNING("not able to start thread: %m");
258                 rc = -1;
259         }
260         return rc;
261 }
262
263 /* process the 'request' with the 'callback' using a separate thread if available */
264 void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
265 {
266         const char *info;
267         struct job *job;
268         int rc;
269
270         /* allocates the job */
271         job = malloc(sizeof *job);
272         if (job == NULL) {
273                 info = "out of memory";
274                 goto error;
275         }
276
277         /* start a thread if needed */
278         pthread_mutex_lock(&mutex);
279         if (remains == 0) {
280                 info = "too many jobs";
281                 goto error2;
282         }
283         if (started == running && started < allowed) {
284                 rc = start_one_thread();
285                 if (rc < 0 && started == 0) {
286                         /* failed to start threading */
287                         info = "can't start thread";
288                         goto error2;
289                 }
290         }
291
292         /* fills and queues the job */
293         job->callback = callback;
294         job->req = req;
295         job->timeout = timeout;
296         job->blocked = 0;
297         job->group = group;
298         afb_req_addref(req);
299         job_add(job);
300         pthread_mutex_unlock(&mutex);
301
302         /* signal an existing job */
303         pthread_cond_signal(&cond);
304         return;
305
306 error2:
307         pthread_mutex_unlock(&mutex);
308         free(job);
309 error:
310         ERROR("can't process job with threads: %s", info);
311         afb_req_fail(req, "internal-error", info);
312 }
313
314 /* initialise the threads */
315 int afb_thread_init(int allowed_count, int start_count, int waiter_count)
316 {
317         threads = calloc(allowed_count, sizeof *threads);
318         if (threads == NULL) {
319                 errno = ENOMEM;
320                 ERROR("can't allocate threads");
321                 return -1;
322         }
323
324         /* records the allowed count */
325         allowed = allowed_count;
326         started = 0;
327         running = 0;
328         remains = waiter_count;
329
330         /* start at least one thread */
331         pthread_mutex_lock(&mutex);
332         while (started < start_count && start_one_thread() == 0);
333         pthread_mutex_unlock(&mutex);
334
335         /* end */
336         return -(started != start_count);
337 }
338
339 /* terminate all the threads and all pending requests */
340 void afb_thread_terminate()
341 {
342         int i, n;
343         struct job *job;
344
345         /* request all threads to stop */
346         pthread_mutex_lock(&mutex);
347         allowed = 0;
348         n = started;
349         for (i = 0 ; i < n ; i++)
350                 threads[i].stop = 1;
351
352         /* wait until all thread are terminated */
353         while (started != 0) {
354                 /* signal threads */
355                 pthread_mutex_unlock(&mutex);
356                 pthread_cond_broadcast(&cond);
357                 pthread_mutex_lock(&mutex);
358
359                 /* join the terminated threads */
360                 for (i = 0 ; i < n ; i++) {
361                         if (threads[i].tid && threads[i].ended) {
362                                 pthread_join(threads[i].tid, NULL);
363                                 threads[i].tid = 0;
364                                 started--;
365                         }
366                 }
367         }
368         pthread_mutex_unlock(&mutex);
369         free(threads);
370
371         /* cancel pending jobs */
372         while (first_job) {
373                 job = first_job;
374                 first_job = job->next;
375                 afb_req_fail(job->req, "aborted", "termination of threading");
376                 afb_req_unref(job->req);
377                 free(job);
378         }
379 }