Switch to libsystemd events
[src/app-framework-binder.git] / src / afb-ws.c
1 /*
2  * Copyright 2016 IoT.bzh
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <stdint.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <sys/uio.h>
25 #include <string.h>
26
27 #include <systemd/sd-event.h>
28
29 #include "websock.h"
30 #include "afb-ws.h"
31
32 #include "afb-common.h"
33
34 /*
35  * declaration of the websock interface for afb-ws
36  */
37 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
38 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
39 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size);
40 static void aws_on_text(struct afb_ws *ws, int last, size_t size);
41 static void aws_on_binary(struct afb_ws *ws, int last, size_t size);
42 static void aws_on_continue(struct afb_ws *ws, int last, size_t size);
43 static void aws_on_readable(struct afb_ws *ws);
44 static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size);
45
46 static struct websock_itf aws_itf = {
47         .writev = (void*)aws_writev,
48         .readv = (void*)aws_readv,
49
50         .on_ping = NULL,
51         .on_pong = NULL,
52         .on_close = (void*)aws_on_close,
53         .on_text = (void*)aws_on_text,
54         .on_binary = (void*)aws_on_binary,
55         .on_continue = (void*)aws_on_continue,
56         .on_extension = NULL,
57
58         .on_error = (void*)aws_on_error
59 };
60
61 /*
62  * a common scheme of buffer handling
63  */
64 struct buf
65 {
66         char *buffer;
67         size_t size;
68 };
69
70 /*
71  * the state
72  */
73 enum state
74 {
75         waiting,
76         reading_text,
77         reading_binary
78 };
79
80 /*
81  * the afb_ws structure
82  */
83 struct afb_ws
84 {
85         int fd;                 /* the socket file descriptor */
86         enum state state;       /* current state */
87         const struct afb_ws_itf *itf; /* the callback interface */
88         void *closure;          /* closure when calling the callbacks */
89         struct websock *ws;     /* the websock handler */
90         sd_event_source *evsrc; /* the event source for the socket */
91         struct buf buffer;      /* the last read fragment */
92 };
93
94 /*
95  * Returns the current buffer of 'ws' that is reset.
96  */
97 static inline struct buf aws_pick_buffer(struct afb_ws *ws)
98 {
99         struct buf result = ws->buffer;
100         ws->buffer.buffer = NULL;
101         ws->buffer.size = 0;
102         return result;
103 }
104
105 /*
106  * Disconnect the websocket 'ws' and calls on_hangup if
107  * 'call_on_hangup' is not null.
108  */
109 static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
110 {
111         struct websock *wsi = ws->ws;
112         if (wsi != NULL) {
113                 ws->ws = NULL;
114                 sd_event_source_unref(ws->evsrc);
115                 ws->evsrc = NULL;
116                 websock_destroy(wsi);
117                 free(aws_pick_buffer(ws).buffer);
118                 ws->state = waiting;
119                 if (call_on_hangup && ws->itf->on_hangup)
120                         ws->itf->on_hangup(ws->closure);
121         }
122 }
123
124 static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *ws)
125 {
126         if ((revents & EPOLLIN) != 0)
127                 aws_on_readable(ws);
128         if ((revents & EPOLLHUP) != 0)
129                 afb_ws_hangup(ws);
130         return 0;
131 }
132
133 /*
134  * Creates the afb_ws structure for the file descritor
135  * 'fd' and the callbacks described by the interface 'itf'
136  * and its 'closure'.
137  *
138  * Returns the handle for the afb_ws created or NULL on error.
139  */
140 struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure)
141 {
142         int rc;
143         struct afb_ws *result;
144
145         assert(fd >= 0);
146
147         /* allocation */
148         result = malloc(sizeof * result);
149         if (result == NULL)
150                 goto error;
151
152         /* init */
153         result->fd = fd;
154         result->state = waiting;
155         result->itf = itf;
156         result->closure = closure;
157         result->buffer.buffer = NULL;
158         result->buffer.size = 0;
159
160         /* creates the websocket */
161         result->ws = websock_create_v13(&aws_itf, result);
162         if (result->ws == NULL)
163                 goto error2;
164
165         /* creates the evsrc */
166         rc = sd_event_add_io(afb_common_get_event_loop(), &result->evsrc, result->fd, EPOLLIN, io_event_callback, result);
167         if (rc < 0) {
168                 errno = -rc;
169                 goto error3;
170         }
171         return result;
172
173 error3:
174         websock_destroy(result->ws);
175 error2:
176         free(result);
177 error:
178         return NULL;
179 }
180
181 /*
182  * Destroys the websocket 'ws'
183  * It first hangup (but without calling on_hangup for safety reasons)
184  * if needed.
185  */
186 void afb_ws_destroy(struct afb_ws *ws)
187 {
188         aws_disconnect(ws, 0);
189         free(ws);
190 }
191
192 /*
193  * Hangup the websocket 'ws'
194  */
195 void afb_ws_hangup(struct afb_ws *ws)
196 {
197         aws_disconnect(ws, 1);
198 }
199
200 /*
201  * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the
202  * 'reason' (that can be NULL and that else should not be greater than 123
203  * characters).
204  * Returns 0 on success or -1 in case of error.
205  */
206 int afb_ws_close(struct afb_ws *ws, uint16_t code, const char *reason)
207 {
208         if (ws->ws == NULL) {
209                 /* disconnected */
210                 errno = EPIPE;
211                 return -1;
212         }
213         return websock_close(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason));
214 }
215
216 /*
217  * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the
218  * 'reason' (that can be NULL and that else should not be greater than 123
219  * characters).
220  * Raise an error after 'close' command is sent.
221  * Returns 0 on success or -1 in case of error.
222  */
223 int afb_ws_error(struct afb_ws *ws, uint16_t code, const char *reason)
224 {
225         if (ws->ws == NULL) {
226                 /* disconnected */
227                 errno = EPIPE;
228                 return -1;
229         }
230         return websock_error(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason));
231 }
232
233 /*
234  * Sends a 'text' of 'length' to the endpoint of 'ws'.
235  * Returns 0 on success or -1 in case of error.
236  */
237 int afb_ws_text(struct afb_ws *ws, const char *text, size_t length)
238 {
239         if (ws->ws == NULL) {
240                 /* disconnected */
241                 errno = EPIPE;
242                 return -1;
243         }
244         return websock_text(ws->ws, 1, text, length);
245 }
246
247 /*
248  * Sends a binary 'data' of 'length' to the endpoint of 'ws'.
249  * Returns 0 on success or -1 in case of error.
250  */
251 int afb_ws_binary(struct afb_ws *ws, const void *data, size_t length)
252 {
253         if (ws->ws == NULL) {
254                 /* disconnected */
255                 errno = EPIPE;
256                 return -1;
257         }
258         return websock_binary(ws->ws, 1, data, length);
259 }
260
261 /*
262  * callback for writing data
263  */
264 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
265 {
266         ssize_t rc;
267         do {
268                 rc = writev(ws->fd, iov, iovcnt);
269         } while(rc == -1 && errno == EINTR);
270         return rc;
271 }
272
273 /*
274  * callback for reading data
275  */
276 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
277 {
278         ssize_t rc;
279         do {
280                 rc = readv(ws->fd, iov, iovcnt);
281         } while(rc == -1 && errno == EINTR);
282         if (rc == 0) {
283                 errno = EPIPE;
284                 rc = -1;
285         }
286         return rc;
287 }
288
289 /*
290  * callback on incoming data
291  */
292 static void aws_on_readable(struct afb_ws *ws)
293 {
294         int rc;
295
296         assert(ws->ws != NULL);
297         rc = websock_dispatch(ws->ws);
298         if (rc < 0 && errno == EPIPE)
299                 afb_ws_hangup(ws);
300 }
301
302 /*
303  * Reads from the websocket handled by 'ws' data of length 'size'
304  * and append it to the current buffer of 'ws'.
305  * Returns 0 in case of error or 1 in case of success.
306  */
307 static int aws_read(struct afb_ws *ws, size_t size)
308 {
309         ssize_t sz;
310         char *buffer;
311
312         if (size != 0) {
313                 buffer = realloc(ws->buffer.buffer, ws->buffer.size + size + 1);
314                 if (buffer == NULL)
315                         return 0;
316                 ws->buffer.buffer = buffer;
317                 sz = websock_read(ws->ws, &buffer[ws->buffer.size], size);
318                 if ((size_t)sz != size)
319                         return 0;
320                 ws->buffer.size += size;
321         }
322         return 1;
323 }
324
325 /*
326  * Callback when 'close' command received from 'ws' with 'code' and 'size'.
327  */
328 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size)
329 {
330         struct buf b;
331
332         ws->state = waiting;
333         free(aws_pick_buffer(ws).buffer);
334         if (ws->itf->on_close == NULL) {
335                 websock_drop(ws->ws);
336                 afb_ws_hangup(ws);
337         } else if (!aws_read(ws, size))
338                 ws->itf->on_close(ws->closure, code, NULL, 0);
339         else {
340                 b = aws_pick_buffer(ws);
341                 ws->itf->on_close(ws->closure, code, b.buffer, b.size);
342         }
343 }
344
345 /*
346  * Drops any incoming data and send an error of 'code'
347  */
348 static void aws_drop_error(struct afb_ws *ws, uint16_t code)
349 {
350         ws->state = waiting;
351         free(aws_pick_buffer(ws).buffer);
352         websock_drop(ws->ws);
353         websock_error(ws->ws, code, NULL, 0);
354 }
355
356 /*
357  * Reads either text or binary data of 'size' from 'ws' eventually 'last'.
358  */
359 static void aws_continue(struct afb_ws *ws, int last, size_t size)
360 {
361         struct buf b;
362         int istxt;
363
364         if (!aws_read(ws, size))
365                 aws_drop_error(ws, WEBSOCKET_CODE_ABNORMAL);
366         else if (last) {
367                 istxt = ws->state == reading_text;
368                 ws->state = waiting;
369                 b = aws_pick_buffer(ws);
370                 b.buffer[b.size] = 0;
371                 (istxt ? ws->itf->on_text : ws->itf->on_binary)(ws->closure, b.buffer, b.size);
372         }
373 }
374
375 /*
376  * Callback when 'text' message received from 'ws' with 'size' and possibly 'last'.
377  */
378 static void aws_on_text(struct afb_ws *ws, int last, size_t size)
379 {
380         if (ws->state != waiting)
381                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
382         else if (ws->itf->on_text == NULL)
383                 aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT);
384         else {
385                 ws->state = reading_text;
386                 aws_continue(ws, last, size);
387         }
388 }
389
390 /*
391  * Callback when 'binary' message received from 'ws' with 'size' and possibly 'last'.
392  */
393 static void aws_on_binary(struct afb_ws *ws, int last, size_t size)
394 {
395         if (ws->state != waiting)
396                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
397         else if (ws->itf->on_binary == NULL)
398                 aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT);
399         else {
400                 ws->state = reading_binary;
401                 aws_continue(ws, last, size);
402         }
403 }
404
405 /*
406  * Callback when 'close' command received from 'ws' with 'code' and 'size'.
407  */
408 static void aws_on_continue(struct afb_ws *ws, int last, size_t size)
409 {
410         if (ws->state == waiting)
411                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
412         else
413                 aws_continue(ws, last, size);
414 }
415
416 /*
417  * Callback when 'close' command is sent to 'ws' with 'code' and 'size'.
418  */
419 static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size)
420 {
421         if (ws->itf->on_error != NULL)
422                 ws->itf->on_error(ws->closure, code, data, size);
423         else
424                 afb_ws_hangup(ws);
425 }
426
427
428