2 * Copyright (C) 2016, 2017 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include <sys/syscall.h>
29 #include <afb/afb-req-itf.h>
31 #include "afb-thread.h"
32 #include "afb-sig-handler.h"
35 /* control of threads */
38 pthread_t tid; /* the thread id */
39 unsigned stop: 1; /* stop request */
40 unsigned ended: 1; /* ended status */
41 unsigned works: 1; /* is it processing a job? */
44 /* describes pending job */
47 void (*callback)(struct afb_req req); /* processing callback */
48 struct afb_req req; /* request to be processed */
49 int timeout; /* timeout in second for processing the request */
50 int blocked; /* is an other request blocking this one ? */
51 void *group; /* group of the request */
52 struct job *next; /* link to the next job enqueued */
55 /* synchronisation of threads */
56 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
57 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
59 /* queue of pending jobs */
60 static struct job *first_job = NULL;
62 /* count allowed, started and running threads */
63 static int allowed = 0;
64 static int started = 0;
65 static int running = 0;
66 static int remains = 0;
69 static struct thread *threads = NULL;
72 static _Thread_local int thread_timer_set;
73 static _Thread_local timer_t thread_timerid;
76 * Creates a timer for the current thread
78 * Returns 0 in case of success
80 int afb_thread_timer_create()
88 sevp.sigev_notify = SIGEV_THREAD_ID;
89 sevp.sigev_signo = SIGALRM;
90 sevp.sigev_value.sival_ptr = NULL;
91 #if defined(sigev_notify_thread_id)
92 sevp.sigev_notify_thread_id = (pid_t)syscall(SYS_gettid);
94 sevp._sigev_un._tid = (pid_t)syscall(SYS_gettid);
96 rc = timer_create(CLOCK_THREAD_CPUTIME_ID, &sevp, &thread_timerid);
97 thread_timer_set = !rc;
103 * Arms the alarm in timeout seconds for the current thread
105 int afb_thread_timer_arm(int timeout)
108 struct itimerspec its;
110 rc = afb_thread_timer_create();
112 its.it_interval.tv_sec = 0;
113 its.it_interval.tv_nsec = 0;
114 its.it_value.tv_sec = timeout;
115 its.it_value.tv_nsec = 0;
116 rc = timer_settime(thread_timerid, 0, &its, NULL);
123 * Disarms the current alarm
125 void afb_thread_timer_disarm()
127 if (thread_timer_set)
128 afb_thread_timer_arm(0);
132 * Delstroy any alarm resource for the current thread
134 void afb_thread_timer_delete()
136 if (thread_timer_set) {
137 timer_delete(thread_timerid);
138 thread_timer_set = 0;
142 /* add the job to the list */
143 static inline void job_add(struct job *job)
145 void *group = job->group;
146 struct job *ijob, **pjob;
154 if (ijob->group == group)
164 /* get the next job to process or NULL if none */
165 static inline struct job *job_get()
167 struct job *job, **pjob;
170 while (job && job->blocked) {
181 /* unblock a group of job */
182 static inline void job_unblock(void *group)
188 if (job->group == group) {
196 /* main loop of processing threads */
197 static void *thread_main_loop(void *data)
199 struct thread *me = data;
204 afb_thread_timer_create();
205 pthread_mutex_lock(&mutex);
209 if (job == NULL && first_job != NULL && running == 0) {
210 /* sad situation!! should not happen */
211 ERROR("threads are blocked!");
213 first_job = job->next;
217 pthread_cond_wait(&cond, &mutex);
222 pthread_mutex_unlock(&mutex);
225 afb_thread_timer_arm(j.timeout);
226 afb_sig_req(j.req, j.callback);
227 afb_thread_timer_disarm();
228 afb_req_unref(j.req);
229 pthread_mutex_lock(&mutex);
231 job_unblock(j.group);
238 pthread_mutex_unlock(&mutex);
239 afb_thread_timer_delete();
243 /* start a new thread */
244 static int start_one_thread()
249 assert(started < allowed);
251 t = &threads[started++];
253 rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
257 WARNING("not able to start thread: %m");
263 /* process the 'request' with the 'callback' using a separate thread if available */
264 void afb_thread_call(struct afb_req req, void (*callback)(struct afb_req req), int timeout, void *group)
270 /* allocates the job */
271 job = malloc(sizeof *job);
273 info = "out of memory";
277 /* start a thread if needed */
278 pthread_mutex_lock(&mutex);
280 info = "too many jobs";
283 if (started == running && started < allowed) {
284 rc = start_one_thread();
285 if (rc < 0 && started == 0) {
286 /* failed to start threading */
287 info = "can't start thread";
292 /* fills and queues the job */
293 job->callback = callback;
295 job->timeout = timeout;
300 pthread_mutex_unlock(&mutex);
302 /* signal an existing job */
303 pthread_cond_signal(&cond);
307 pthread_mutex_unlock(&mutex);
310 ERROR("can't process job with threads: %s", info);
311 afb_req_fail(req, "internal-error", info);
314 /* initialise the threads */
315 int afb_thread_init(int allowed_count, int start_count, int waiter_count)
317 threads = calloc(allowed_count, sizeof *threads);
318 if (threads == NULL) {
320 ERROR("can't allocate threads");
324 /* records the allowed count */
325 allowed = allowed_count;
328 remains = waiter_count;
330 /* start at least one thread */
331 pthread_mutex_lock(&mutex);
332 while (started < start_count && start_one_thread() == 0);
333 pthread_mutex_unlock(&mutex);
336 return -(started != start_count);
339 /* terminate all the threads and all pending requests */
340 void afb_thread_terminate()
345 /* request all threads to stop */
346 pthread_mutex_lock(&mutex);
349 for (i = 0 ; i < n ; i++)
352 /* wait until all thread are terminated */
353 while (started != 0) {
355 pthread_mutex_unlock(&mutex);
356 pthread_cond_broadcast(&cond);
357 pthread_mutex_lock(&mutex);
359 /* join the terminated threads */
360 for (i = 0 ; i < n ; i++) {
361 if (threads[i].tid && threads[i].ended) {
362 pthread_join(threads[i].tid, NULL);
368 pthread_mutex_unlock(&mutex);
371 /* cancel pending jobs */
374 first_job = job->next;
375 afb_req_fail(job->req, "aborted", "termination of threading");
376 afb_req_unref(job->req);