first add of asynchonous handling
[src/app-framework-binder.git] / src / utils-upoll.c
1 /*
2  * Copyright 2016 IoT.bzh
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <sys/epoll.h>
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #include <assert.h>
24
25 #include "utils-upoll.h"
26
27 struct upollfd;
28
29 /*
30  * Structure describing one opened client
31  */
32 struct upoll
33 {
34         struct upollfd *fd;     /* structure handling the file descriptor */
35         void (*read)(void *);   /* callback for handling on_readable */
36         void (*write)(void *);  /* callback for handling on_writable */
37         void (*hangup)(void *); /* callback for handling on_hangup */
38         void *closure;          /* closure for callbacks */
39         struct upoll *next;     /* next client of the same file descriptor */
40 };
41
42 /*
43  * Structure describing a watched file descriptor
44  */
45 struct upollfd
46 {
47         int fd;                 /* watch file descriptor */
48         uint32_t events;        /* watched events */
49         struct upollfd *next;   /* next watched file descriptor */
50         struct upoll *head;     /* first client watching the file descriptor */
51 };
52
53
54 /*
55  * Structure describing a upoll group
56 struct upollgrp
57 {
58         int pollfd;
59         struct upollfd *head;
60         struct upoll *current;
61         pthread_mutex_t mutex;
62 };
63
64
65 static struct upollgrp global = {
66         .pollfd = 0,
67         .head = NULL,
68         .current = NULL,
69         .mutex = PTHREAD_MUTEX_INITIALIZER
70 };
71  */
72
73 static int pollfd = 0;
74 static struct upollfd *head = NULL;
75 static struct upoll *current = NULL;
76 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
77
78 /*
79  * Compute the events for the set of clients
80  */
81 static int update_flags_locked(struct upollfd *ufd)
82 {
83         int rc;
84         struct upoll *u;
85         struct epoll_event e;
86         uint32_t events;
87
88         /* compute expected events */
89         events = 0;
90         u = ufd->head;
91         while (u != NULL) {
92                 if (u->read != NULL)
93                         events |= EPOLLIN;
94                 if (u->write != NULL)
95                         events |= EPOLLOUT;
96                 u = u->next;
97         }
98         if (ufd->events == events)
99                 rc = 0;
100         else {
101                 e.events = events;
102                 e.data.ptr = ufd;
103                 rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
104                 if (rc == 0)
105                         ufd->events = events;
106         }
107         pthread_mutex_unlock(&mutex);
108         return rc;
109 }
110
111 /*
112  * Compute the events for the set of clients
113  */
114 static int update_flags(struct upollfd *ufd)
115 {
116         pthread_mutex_lock(&mutex);
117         return update_flags_locked(ufd);
118 }
119
120 /*
121  *
122  */
123 static int update(struct upollfd *ufd)
124 {
125         struct upollfd **prv;
126
127         pthread_mutex_lock(&mutex);
128         if (ufd->head != NULL)
129                 return update_flags_locked(ufd);
130
131         /* no more watchers */
132         prv = &head;
133         while(*prv) {
134                 if (*prv == ufd) {
135                         *prv = ufd->next;
136                         break;
137                 }
138                 prv = &(*prv)->next;
139         }
140         pthread_mutex_unlock(&mutex);
141         epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
142         free(ufd);
143         return 0;
144 }
145
146 static struct upollfd *get_fd(int fd)
147 {
148         struct epoll_event e;
149         struct upollfd *result;
150         int rc;
151
152         /* opens the epoll stream */
153         if (pollfd == 0) {
154                 pollfd = epoll_create1(EPOLL_CLOEXEC);
155                 if (pollfd == 0) {
156                         pollfd = dup(0);
157                         close(0);
158                 }
159                 if (pollfd < 0) {
160                         pollfd = 0;
161                         return NULL;
162                 }
163         }
164
165         /* search */
166         result = head;
167         while (result != NULL) {
168                 if (result->fd == fd)
169                         return result;
170                 result = result->next;
171         }
172
173         /* allocates */
174         result = calloc(1, sizeof *result);
175         if (result == NULL)
176                 return NULL;
177
178         /* init */
179         result->fd = fd;
180         pthread_mutex_lock(&mutex);
181         result->next = head;
182         head = result;
183         pthread_mutex_unlock(&mutex);
184
185         /* records */
186         e.events = 0;
187         e.data.ptr = result;
188         rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
189         if (rc == 0)
190                 return result;
191
192         /* revert on error */
193         rc = errno;
194         update(result);
195         errno = rc;
196         return NULL;
197 }
198
199 int upoll_is_valid(struct upoll *upoll)
200 {
201         struct upollfd *itfd = head;
202         struct upoll *it;
203         while (itfd != NULL) {
204                 it = itfd->head;
205                 while (it != NULL) {
206                         if (it == upoll)
207                                 return 1;
208                         it = it->next;
209                 }
210                 itfd = itfd->next;
211         }
212         return 0;
213 }
214
215 struct upoll *upoll_open(int fd, void *closure)
216 {
217         struct upollfd *ufd;
218         struct upoll *result;
219
220         /* allocates */
221         result = calloc(1, sizeof *result);
222         if (result == NULL)
223                 return NULL;
224
225         /* get for fd */
226         ufd = get_fd(fd);
227         if (ufd == NULL) {
228                 free(result);
229                 return NULL;
230         }
231
232         /* init */
233         result->fd = ufd;
234         result->closure = closure;
235         pthread_mutex_lock(&mutex);
236         result->next = ufd->head;
237         ufd->head = result;
238         pthread_mutex_unlock(&mutex);
239         return result;
240 }
241
242 int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
243 {
244         assert(pollfd != 0);
245         assert(upoll_is_valid(upoll));
246
247         upoll->read = process;
248         return update_flags(upoll->fd);
249 }
250
251 int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
252 {
253         assert(pollfd != 0);
254         assert(upoll_is_valid(upoll));
255
256         upoll->write = process;
257         return update_flags(upoll->fd);
258 }
259
260 void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
261 {
262         assert(pollfd != 0);
263         assert(upoll_is_valid(upoll));
264
265         upoll->hangup = process;
266 }
267
268 void upoll_close(struct upoll *upoll)
269 {
270         struct upoll **it;
271         struct upollfd *ufd;
272
273         assert(pollfd != 0);
274         assert(upoll_is_valid(upoll));
275
276         ufd = upoll->fd;
277         pthread_mutex_lock(&mutex);
278         if (current == upoll)
279                 current = NULL;
280         it = &ufd->head;
281         while (*it != upoll)
282                 it = &(*it)->next;
283         *it = upoll->next;
284         pthread_mutex_unlock(&mutex);
285         free(upoll);
286         update(ufd);
287 }
288
289 int upoll_wait(int timeout)
290 {
291         int rc;
292         struct epoll_event e;
293         struct upollfd *ufd;
294
295         if (pollfd == 0) {
296                 errno = ECANCELED;
297                 return -1;
298         }
299
300         do {
301                 rc = epoll_wait(pollfd, &e, 1, timeout);
302         } while (rc < 0 && errno == EINTR);
303         if (rc == 1) {
304                 ufd = e.data.ptr;
305                 current = ufd->head;
306                 e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
307                 while (current != NULL && e.events != 0) {
308                         if ((e.events & EPOLLIN) && current->read) {
309                                 current->read(current->closure);
310                                 e.events &= (uint32_t)~EPOLLIN;
311                                 continue;
312                         }
313                         if ((e.events & EPOLLOUT) && current->write) {
314                                 current->write(current->closure);
315                                 e.events &= (uint32_t)~EPOLLOUT;
316                                 continue;
317                         }
318                         if ((e.events & EPOLLHUP) && current->hangup) {
319                                 current->hangup(current->closure);
320                                 if (current == NULL)
321                                         break;
322                         }
323                         current = current->next;
324                 }
325         }
326         return rc < 0 ? rc : 0;
327 }
328