example of integration with websocket in C
[src/app-framework-binder.git] / src / afb-ws.c
1 /*
2  * Copyright 2016 IoT.bzh
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <stdint.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <sys/uio.h>
25 #include <string.h>
26 #include <stdarg.h>
27
28 #include <systemd/sd-event.h>
29
30 #include "websock.h"
31 #include "afb-ws.h"
32
33 #include "afb-common.h"
34
35 /*
36  * declaration of the websock interface for afb-ws
37  */
38 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
39 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
40 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size);
41 static void aws_on_text(struct afb_ws *ws, int last, size_t size);
42 static void aws_on_binary(struct afb_ws *ws, int last, size_t size);
43 static void aws_on_continue(struct afb_ws *ws, int last, size_t size);
44 static void aws_on_readable(struct afb_ws *ws);
45 static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size);
46
47 static struct websock_itf aws_itf = {
48         .writev = (void*)aws_writev,
49         .readv = (void*)aws_readv,
50
51         .on_ping = NULL,
52         .on_pong = NULL,
53         .on_close = (void*)aws_on_close,
54         .on_text = (void*)aws_on_text,
55         .on_binary = (void*)aws_on_binary,
56         .on_continue = (void*)aws_on_continue,
57         .on_extension = NULL,
58
59         .on_error = (void*)aws_on_error
60 };
61
62 /*
63  * a common scheme of buffer handling
64  */
65 struct buf
66 {
67         char *buffer;
68         size_t size;
69 };
70
71 /*
72  * the state
73  */
74 enum state
75 {
76         waiting,
77         reading_text,
78         reading_binary
79 };
80
81 /*
82  * the afb_ws structure
83  */
84 struct afb_ws
85 {
86         int fd;                 /* the socket file descriptor */
87         enum state state;       /* current state */
88         const struct afb_ws_itf *itf; /* the callback interface */
89         void *closure;          /* closure when calling the callbacks */
90         struct websock *ws;     /* the websock handler */
91         sd_event_source *evsrc; /* the event source for the socket */
92         struct buf buffer;      /* the last read fragment */
93 };
94
95 /*
96  * Returns the current buffer of 'ws' that is reset.
97  */
98 static inline struct buf aws_pick_buffer(struct afb_ws *ws)
99 {
100         struct buf result = ws->buffer;
101         ws->buffer.buffer = NULL;
102         ws->buffer.size = 0;
103         return result;
104 }
105
106 /*
107  * Disconnect the websocket 'ws' and calls on_hangup if
108  * 'call_on_hangup' is not null.
109  */
110 static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
111 {
112         struct websock *wsi = ws->ws;
113         if (wsi != NULL) {
114                 ws->ws = NULL;
115                 sd_event_source_unref(ws->evsrc);
116                 ws->evsrc = NULL;
117                 websock_destroy(wsi);
118                 free(aws_pick_buffer(ws).buffer);
119                 ws->state = waiting;
120                 if (call_on_hangup && ws->itf->on_hangup)
121                         ws->itf->on_hangup(ws->closure);
122         }
123 }
124
125 static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *ws)
126 {
127         if ((revents & EPOLLIN) != 0)
128                 aws_on_readable(ws);
129         if ((revents & EPOLLHUP) != 0)
130                 afb_ws_hangup(ws);
131         return 0;
132 }
133
134 /*
135  * Creates the afb_ws structure for the file descritor
136  * 'fd' and the callbacks described by the interface 'itf'
137  * and its 'closure'.
138  *
139  * Returns the handle for the afb_ws created or NULL on error.
140  */
141 struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure)
142 {
143         int rc;
144         struct afb_ws *result;
145
146         assert(fd >= 0);
147
148         /* allocation */
149         result = malloc(sizeof * result);
150         if (result == NULL)
151                 goto error;
152
153         /* init */
154         result->fd = fd;
155         result->state = waiting;
156         result->itf = itf;
157         result->closure = closure;
158         result->buffer.buffer = NULL;
159         result->buffer.size = 0;
160
161         /* creates the websocket */
162         result->ws = websock_create_v13(&aws_itf, result);
163         if (result->ws == NULL)
164                 goto error2;
165
166         /* creates the evsrc */
167         rc = sd_event_add_io(afb_common_get_event_loop(), &result->evsrc, result->fd, EPOLLIN, io_event_callback, result);
168         if (rc < 0) {
169                 errno = -rc;
170                 goto error3;
171         }
172         return result;
173
174 error3:
175         websock_destroy(result->ws);
176 error2:
177         free(result);
178 error:
179         return NULL;
180 }
181
182 /*
183  * Destroys the websocket 'ws'
184  * It first hangup (but without calling on_hangup for safety reasons)
185  * if needed.
186  */
187 void afb_ws_destroy(struct afb_ws *ws)
188 {
189         aws_disconnect(ws, 0);
190         free(ws);
191 }
192
193 /*
194  * Hangup the websocket 'ws'
195  */
196 void afb_ws_hangup(struct afb_ws *ws)
197 {
198         aws_disconnect(ws, 1);
199 }
200
201 /*
202  * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the
203  * 'reason' (that can be NULL and that else should not be greater than 123
204  * characters).
205  * Returns 0 on success or -1 in case of error.
206  */
207 int afb_ws_close(struct afb_ws *ws, uint16_t code, const char *reason)
208 {
209         if (ws->ws == NULL) {
210                 /* disconnected */
211                 errno = EPIPE;
212                 return -1;
213         }
214         return websock_close(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason));
215 }
216
217 /*
218  * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the
219  * 'reason' (that can be NULL and that else should not be greater than 123
220  * characters).
221  * Raise an error after 'close' command is sent.
222  * Returns 0 on success or -1 in case of error.
223  */
224 int afb_ws_error(struct afb_ws *ws, uint16_t code, const char *reason)
225 {
226         if (ws->ws == NULL) {
227                 /* disconnected */
228                 errno = EPIPE;
229                 return -1;
230         }
231         return websock_error(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason));
232 }
233
234 /*
235  * Sends a 'text' of 'length' to the endpoint of 'ws'.
236  * Returns 0 on success or -1 in case of error.
237  */
238 int afb_ws_text(struct afb_ws *ws, const char *text, size_t length)
239 {
240         if (ws->ws == NULL) {
241                 /* disconnected */
242                 errno = EPIPE;
243                 return -1;
244         }
245         return websock_text(ws->ws, 1, text, length);
246 }
247
248 /*
249  * Sends a variable list of texts to the endpoint of 'ws'.
250  * Returns 0 on success or -1 in case of error.
251  */
252 int afb_ws_texts(struct afb_ws *ws, ...)
253 {
254         va_list args;
255         struct iovec ios[32];
256         int count;
257         const char *s;
258
259         if (ws->ws == NULL) {
260                 /* disconnected */
261                 errno = EPIPE;
262                 return -1;
263         }
264
265         count = 0;
266         va_start(args, ws);
267         s = va_arg(args, const char *);
268         while (s != NULL) {
269                 if (count == 32) {
270                         errno = EINVAL;
271                         return -1;
272                 }
273                 ios[count].iov_base = (void*)s;
274                 ios[count].iov_len = strlen(s);
275                 count++;
276                 s = va_arg(args, const char *);
277         }
278         va_end(args);
279         return websock_text_v(ws->ws, 1, ios, count);
280 }
281
282 /*
283  * Sends a binary 'data' of 'length' to the endpoint of 'ws'.
284  * Returns 0 on success or -1 in case of error.
285  */
286 int afb_ws_binary(struct afb_ws *ws, const void *data, size_t length)
287 {
288         if (ws->ws == NULL) {
289                 /* disconnected */
290                 errno = EPIPE;
291                 return -1;
292         }
293         return websock_binary(ws->ws, 1, data, length);
294 }
295
296 /*
297  * callback for writing data
298  */
299 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
300 {
301         ssize_t rc;
302         do {
303                 rc = writev(ws->fd, iov, iovcnt);
304         } while(rc == -1 && errno == EINTR);
305         return rc;
306 }
307
308 /*
309  * callback for reading data
310  */
311 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
312 {
313         ssize_t rc;
314         do {
315                 rc = readv(ws->fd, iov, iovcnt);
316         } while(rc == -1 && errno == EINTR);
317         if (rc == 0) {
318                 errno = EPIPE;
319                 rc = -1;
320         }
321         return rc;
322 }
323
324 /*
325  * callback on incoming data
326  */
327 static void aws_on_readable(struct afb_ws *ws)
328 {
329         int rc;
330
331         assert(ws->ws != NULL);
332         rc = websock_dispatch(ws->ws);
333         if (rc < 0 && errno == EPIPE)
334                 afb_ws_hangup(ws);
335 }
336
337 /*
338  * Reads from the websocket handled by 'ws' data of length 'size'
339  * and append it to the current buffer of 'ws'.
340  * Returns 0 in case of error or 1 in case of success.
341  */
342 static int aws_read(struct afb_ws *ws, size_t size)
343 {
344         ssize_t sz;
345         char *buffer;
346
347         if (size != 0) {
348                 buffer = realloc(ws->buffer.buffer, ws->buffer.size + size + 1);
349                 if (buffer == NULL)
350                         return 0;
351                 ws->buffer.buffer = buffer;
352                 sz = websock_read(ws->ws, &buffer[ws->buffer.size], size);
353                 if ((size_t)sz != size)
354                         return 0;
355                 ws->buffer.size += size;
356         }
357         return 1;
358 }
359
360 /*
361  * Callback when 'close' command received from 'ws' with 'code' and 'size'.
362  */
363 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size)
364 {
365         struct buf b;
366
367         ws->state = waiting;
368         free(aws_pick_buffer(ws).buffer);
369         if (ws->itf->on_close == NULL) {
370                 websock_drop(ws->ws);
371                 afb_ws_hangup(ws);
372         } else if (!aws_read(ws, size))
373                 ws->itf->on_close(ws->closure, code, NULL, 0);
374         else {
375                 b = aws_pick_buffer(ws);
376                 ws->itf->on_close(ws->closure, code, b.buffer, b.size);
377         }
378 }
379
380 /*
381  * Drops any incoming data and send an error of 'code'
382  */
383 static void aws_drop_error(struct afb_ws *ws, uint16_t code)
384 {
385         ws->state = waiting;
386         free(aws_pick_buffer(ws).buffer);
387         websock_drop(ws->ws);
388         websock_error(ws->ws, code, NULL, 0);
389 }
390
391 /*
392  * Reads either text or binary data of 'size' from 'ws' eventually 'last'.
393  */
394 static void aws_continue(struct afb_ws *ws, int last, size_t size)
395 {
396         struct buf b;
397         int istxt;
398
399         if (!aws_read(ws, size))
400                 aws_drop_error(ws, WEBSOCKET_CODE_ABNORMAL);
401         else if (last) {
402                 istxt = ws->state == reading_text;
403                 ws->state = waiting;
404                 b = aws_pick_buffer(ws);
405                 b.buffer[b.size] = 0;
406                 (istxt ? ws->itf->on_text : ws->itf->on_binary)(ws->closure, b.buffer, b.size);
407         }
408 }
409
410 /*
411  * Callback when 'text' message received from 'ws' with 'size' and possibly 'last'.
412  */
413 static void aws_on_text(struct afb_ws *ws, int last, size_t size)
414 {
415         if (ws->state != waiting)
416                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
417         else if (ws->itf->on_text == NULL)
418                 aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT);
419         else {
420                 ws->state = reading_text;
421                 aws_continue(ws, last, size);
422         }
423 }
424
425 /*
426  * Callback when 'binary' message received from 'ws' with 'size' and possibly 'last'.
427  */
428 static void aws_on_binary(struct afb_ws *ws, int last, size_t size)
429 {
430         if (ws->state != waiting)
431                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
432         else if (ws->itf->on_binary == NULL)
433                 aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT);
434         else {
435                 ws->state = reading_binary;
436                 aws_continue(ws, last, size);
437         }
438 }
439
440 /*
441  * Callback when 'close' command received from 'ws' with 'code' and 'size'.
442  */
443 static void aws_on_continue(struct afb_ws *ws, int last, size_t size)
444 {
445         if (ws->state == waiting)
446                 aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR);
447         else
448                 aws_continue(ws, last, size);
449 }
450
451 /*
452  * Callback when 'close' command is sent to 'ws' with 'code' and 'size'.
453  */
454 static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size)
455 {
456         if (ws->itf->on_error != NULL)
457                 ws->itf->on_error(ws->closure, code, data, size);
458         else
459                 afb_ws_hangup(ws);
460 }
461
462