Add comments
[src/app-framework-binder.git] / src / utils-upoll.c
1 /*
2  * Copyright 2016 IoT.bzh
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <sys/epoll.h>
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #include <assert.h>
24
25 #include "utils-upoll.h"
26
27 struct upollfd;
28
29 /*
30  * Structure describing one opened client
31  */
32 struct upoll
33 {
34         struct upollfd *fd;     /* structure handling the file descriptor */
35         void (*read)(void *);   /* callback for handling on_readable */
36         void (*write)(void *);  /* callback for handling on_writable */
37         void (*hangup)(void *); /* callback for handling on_hangup */
38         void *closure;          /* closure for callbacks */
39         struct upoll *next;     /* next client of the same file descriptor */
40 };
41
42 /*
43  * Structure describing a watched file descriptor
44  */
45 struct upollfd
46 {
47         int fd;                 /* watch file descriptor */
48         uint32_t events;        /* watched events */
49         struct upollfd *next;   /* next watched file descriptor */
50         struct upoll *head;     /* first client watching the file descriptor */
51 };
52
53 /*
54  * Structure describing a upoll group
55  */
56 struct upollgrp
57 {
58         int pollfd;
59         struct upollfd *head;
60         struct upoll *current;
61         pthread_mutex_t mutex;
62 };
63
64 static struct upollgrp global = {
65         .pollfd = 0,
66         .head = NULL,
67         .current = NULL,
68         .mutex = PTHREAD_MUTEX_INITIALIZER
69 };
70
71 static int pollfd = 0;
72 static struct upollfd *head = NULL;
73 static struct upoll *current = NULL;
74 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
75
76 /*
77  *
78  */
79 static int update(struct upollfd *ufd)
80 {
81         int rc;
82         struct upoll *u;
83         struct epoll_event e;
84         uint32_t events;
85         struct upollfd **prv;
86
87         events = 0;
88         pthread_mutex_lock(&mutex);
89         u = ufd->head;
90         if (u == NULL) {
91                 /* no more watchers */
92                 prv = &head;
93                 while(*prv) {
94                         if (*prv == ufd) {
95                                 *prv = ufd->next;
96                                 break;
97                         }
98                         prv = &(*prv)->next;
99                 }
100                 pthread_mutex_unlock(&mutex);
101                 epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
102                 free(ufd);
103                 return 0;
104         }
105         /* compute the events for the watchers */
106         while (u != NULL) {
107                 if (u->read != NULL)
108                         events |= EPOLLIN;
109                 if (u->write != NULL)
110                         events |= EPOLLOUT;
111                 u = u->next;
112         }
113         pthread_mutex_unlock(&mutex);
114         if (ufd->events == events)
115                 return 0;
116         e.events = events;
117         e.data.ptr = ufd;
118         rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
119         if (rc == 0)
120                 ufd->events = events;
121         return rc;
122 }
123
124 /*
125  * Compute the events for the set of clients
126  */
127 static int update_flags(struct upollfd *ufd)
128 {
129         int rc;
130         struct upoll *u;
131         struct epoll_event e;
132         uint32_t events;
133         struct upollfd **prv;
134
135         /* compute expected events */
136         events = 0;
137         pthread_mutex_lock(&mutex);
138         u = ufd->head;
139         assert (u != NULL);
140         while (u != NULL) {
141                 if (u->read != NULL)
142                         events |= EPOLLIN;
143                 if (u->write != NULL)
144                         events |= EPOLLOUT;
145                 u = u->next;
146         }
147         if (ufd->events == events)
148                 rc = 0;
149         else {
150                 e.events = events;
151                 e.data.ptr = ufd;
152                 rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
153                 if (rc == 0)
154                         ufd->events = events;
155         }
156         pthread_mutex_unlock(&mutex);
157         return rc;
158 }
159
160 static struct upollfd *get_fd(int fd)
161 {
162         struct epoll_event e;
163         struct upollfd *result;
164         int rc;
165
166         /* opens the epoll stream */
167         if (pollfd == 0) {
168                 pollfd = epoll_create1(EPOLL_CLOEXEC);
169                 if (pollfd == 0) {
170                         pollfd = dup(0);
171                         close(0);
172                 }
173                 if (pollfd < 0) {
174                         pollfd = 0;
175                         return NULL;
176                 }
177         }
178
179         /* search */
180         result = head;
181         while (result != NULL) {
182                 if (result->fd == fd)
183                         return result;
184                 result = result->next;
185         }
186
187         /* allocates */
188         result = calloc(1, sizeof *result);
189         if (result == NULL)
190                 return NULL;
191
192         /* init */
193         result->fd = fd;
194         pthread_mutex_lock(&mutex);
195         result->next = head;
196         head = result;
197         pthread_mutex_unlock(&mutex);
198
199         /* records */
200         e.events = 0;
201         e.data.ptr = result;
202         rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
203         if (rc == 0)
204                 return result;
205
206         /* revert on error */
207         rc = errno;
208         update(result);
209         errno = rc;
210         return NULL;
211 }
212
213 int upoll_is_valid(struct upoll *upoll)
214 {
215         struct upollfd *itfd = head;
216         struct upoll *it;
217         while (itfd != NULL) {
218                 it = itfd->head;
219                 while (it != NULL) {
220                         if (it == upoll)
221                                 return 1;
222                         it = it->next;
223                 }
224                 itfd = itfd->next;
225         }
226         return 0;
227 }
228
229 struct upoll *upoll_open(int fd, void *closure)
230 {
231         struct upollfd *ufd;
232         struct upoll *result;
233
234         /* allocates */
235         result = calloc(1, sizeof *result);
236         if (result == NULL)
237                 return NULL;
238
239         /* get for fd */
240         ufd = get_fd(fd);
241         if (ufd == NULL) {
242                 free(result);
243                 return NULL;
244         }
245
246         /* init */
247         result->fd = ufd;
248         result->closure = closure;
249         pthread_mutex_lock(&mutex);
250         result->next = ufd->head;
251         ufd->head = result;
252         pthread_mutex_unlock(&mutex);
253         return result;
254 }
255
256 int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
257 {
258         assert(pollfd != 0);
259         assert(upoll_is_valid(upoll));
260
261         upoll->read = process;
262         return update(upoll->fd);
263 }
264
265 int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
266 {
267         assert(pollfd != 0);
268         assert(upoll_is_valid(upoll));
269
270         upoll->write = process;
271         return update(upoll->fd);
272 }
273
274 void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
275 {
276         assert(pollfd != 0);
277         assert(upoll_is_valid(upoll));
278
279         upoll->hangup = process;
280 }
281
282 void upoll_close(struct upoll *upoll)
283 {
284         struct upoll **it;
285         struct upollfd *ufd;
286
287         assert(pollfd != 0);
288         assert(upoll_is_valid(upoll));
289
290         ufd = upoll->fd;
291         pthread_mutex_lock(&mutex);
292         if (current == upoll)
293                 current = NULL;
294         it = &ufd->head;
295         while (*it != upoll)
296                 it = &(*it)->next;
297         *it = upoll->next;
298         pthread_mutex_unlock(&mutex);
299         free(upoll);
300         update(ufd);
301 }
302
303 int upoll_wait(int timeout)
304 {
305         int rc;
306         struct epoll_event e;
307         struct upollfd *ufd;
308
309         if (pollfd == 0) {
310                 errno = ECANCELED;
311                 return -1;
312         }
313
314         do {
315                 rc = epoll_wait(pollfd, &e, 1, timeout);
316         } while (rc < 0 && errno == EINTR);
317         if (rc == 1) {
318                 ufd = e.data.ptr;
319                 current = ufd->head;
320                 e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
321                 while (current != NULL && e.events != 0) {
322                         if ((e.events & EPOLLIN) && current->read) {
323                                 current->read(current->closure);
324                                 e.events &= (uint32_t)~EPOLLIN;
325                                 continue;
326                         }
327                         if ((e.events & EPOLLOUT) && current->write) {
328                                 current->write(current->closure);
329                                 e.events &= (uint32_t)~EPOLLOUT;
330                                 continue;
331                         }
332                         if ((e.events & EPOLLHUP) && current->hangup) {
333                                 current->hangup(current->closure);
334                                 if (current == NULL)
335                                         break;
336                         }
337                         current = current->next;
338                 }
339         }
340         return rc < 0 ? rc : 0;
341 }
342