2a270250918410b7b12ae3a133dd024dc3b735a7
[src/app-framework-binder.git] / src / utils-upoll.c
1 /*
2  * Copyright 2016 IoT.bzh
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <sys/epoll.h>
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #include <assert.h>
24
25 #include "utils-upoll.h"
26
27
28 struct upollfd;
29
30 struct upoll
31 {
32         struct upollfd *fd;
33         void (*read)(void *);
34         void (*write)(void *);
35         void (*hangup)(void *);
36         void *closure;
37         struct upoll *next;
38 };
39
40 struct upollfd
41 {
42         int fd;
43         uint32_t events;
44         struct upollfd *next;
45         struct upoll *head;
46 };
47
48 static int pollfd = 0;
49 static struct upollfd *head = NULL;
50 static struct upoll *current = NULL;
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52
53 static int update(struct upollfd *ufd)
54 {
55         int rc;
56         struct upoll *u;
57         struct epoll_event e;
58         uint32_t events;
59         struct upollfd **prv;
60
61         events = 0;
62         pthread_mutex_lock(&mutex);
63         u = ufd->head;
64         if (u == NULL) {
65                 /* no more watchers */
66                 prv = &head;
67                 while(*prv) {
68                         if (*prv == ufd) {
69                                 *prv = ufd->next;
70                                 break;
71                         }
72                         prv = &(*prv)->next;
73                 }
74                 pthread_mutex_unlock(&mutex);
75                 epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
76                 free(ufd);
77                 return 0;
78         }
79         /* compute the events for the watchers */
80         while (u != NULL) {
81                 if (u->read != NULL)
82                         events |= EPOLLIN;
83                 if (u->write != NULL)
84                         events |= EPOLLOUT;
85                 u = u->next;
86         }
87         pthread_mutex_unlock(&mutex);
88         if (ufd->events == events)
89                 return 0;
90         e.events = events;
91         e.data.ptr = ufd;
92         rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
93         if (rc == 0)
94                 ufd->events = events;
95         return rc;
96 }
97
98 static struct upollfd *get_fd(int fd)
99 {
100         struct epoll_event e;
101         struct upollfd *result;
102         int rc;
103
104         /* opens the epoll stream */
105         if (pollfd == 0) {
106                 pollfd = epoll_create1(EPOLL_CLOEXEC);
107                 if (pollfd == 0) {
108                         pollfd = dup(0);
109                         close(0);
110                 }
111                 if (pollfd < 0) {
112                         pollfd = 0;
113                         return NULL;
114                 }
115         }
116
117         /* search */
118         result = head;
119         while (result != NULL) {
120                 if (result->fd == fd)
121                         return result;
122                 result = result->next;
123         }
124
125         /* allocates */
126         result = calloc(1, sizeof *result);
127         if (result == NULL)
128                 return NULL;
129
130         /* init */
131         result->fd = fd;
132         pthread_mutex_lock(&mutex);
133         result->next = head;
134         head = result;
135         pthread_mutex_unlock(&mutex);
136
137         /* records */
138         e.events = 0;
139         e.data.ptr = result;
140         rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
141         if (rc == 0)
142                 return result;
143
144         /* revert on error */
145         rc = errno;
146         update(result);
147         errno = rc;
148         return NULL;
149 }
150
151 int upoll_is_valid(struct upoll *upoll)
152 {
153         struct upollfd *itfd = head;
154         struct upoll *it;
155         while (itfd != NULL) {
156                 it = itfd->head;
157                 while (it != NULL) {
158                         if (it == upoll)
159                                 return 1;
160                         it = it->next;
161                 }
162                 itfd = itfd->next;
163         }
164         return 0;
165 }
166
167 struct upoll *upoll_open(int fd, void *closure)
168 {
169         struct upollfd *ufd;
170         struct upoll *result;
171
172         /* allocates */
173         result = calloc(1, sizeof *result);
174         if (result == NULL)
175                 return NULL;
176
177         /* get for fd */
178         ufd = get_fd(fd);
179         if (ufd == NULL) {
180                 free(result);
181                 return NULL;
182         }
183
184         /* init */
185         result->fd = ufd;
186         result->closure = closure;
187         pthread_mutex_lock(&mutex);
188         result->next = ufd->head;
189         ufd->head = result;
190         pthread_mutex_unlock(&mutex);
191         return result;
192 }
193
194 int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
195 {
196         assert(pollfd != 0);
197         assert(upoll_is_valid(upoll));
198
199         upoll->read = process;
200         return update(upoll->fd);
201 }
202
203 int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
204 {
205         assert(pollfd != 0);
206         assert(upoll_is_valid(upoll));
207
208         upoll->write = process;
209         return update(upoll->fd);
210 }
211
212 void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
213 {
214         assert(pollfd != 0);
215         assert(upoll_is_valid(upoll));
216
217         upoll->hangup = process;
218 }
219
220 void upoll_close(struct upoll *upoll)
221 {
222         struct upoll **it;
223         struct upollfd *ufd;
224
225         assert(pollfd != 0);
226         assert(upoll_is_valid(upoll));
227
228         ufd = upoll->fd;
229         pthread_mutex_lock(&mutex);
230         if (current == upoll)
231                 current = NULL;
232         it = &ufd->head;
233         while (*it != upoll)
234                 it = &(*it)->next;
235         *it = upoll->next;
236         pthread_mutex_unlock(&mutex);
237         free(upoll);
238         update(ufd);
239 }
240
241 int upoll_wait(int timeout)
242 {
243         int rc;
244         struct epoll_event e;
245         struct upollfd *ufd;
246
247         if (pollfd == 0) {
248                 errno = ECANCELED;
249                 return -1;
250         }
251
252         do {
253                 rc = epoll_wait(pollfd, &e, 1, timeout);
254         } while (rc < 0 && errno == EINTR);
255         if (rc == 1) {
256                 ufd = e.data.ptr;
257                 current = ufd->head;
258                 e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
259                 while (current != NULL && e.events != 0) {
260                         if ((e.events & EPOLLIN) && current->read) {
261                                 current->read(current->closure);
262                                 e.events &= (uint32_t)~EPOLLIN;
263                                 continue;
264                         }
265                         if ((e.events & EPOLLOUT) && current->write) {
266                                 current->write(current->closure);
267                                 e.events &= (uint32_t)~EPOLLOUT;
268                                 continue;
269                         }
270                         if ((e.events & EPOLLHUP) && current->hangup) {
271                                 current->hangup(current->closure);
272                                 if (current == NULL)
273                                         break;
274                         }
275                         current = current->next;
276                 }
277         }
278         return rc < 0 ? rc : 0;
279 }
280