Make websocket concurrent
[src/app-framework-binder.git] / src / afb-wsj1.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <pthread.h>
27
28 #include <json-c/json.h>
29
30 #include "afb-ws.h"
31 #include "afb-wsj1.h"
32
33 #define CALL 2
34 #define RETOK 3
35 #define RETERR 4
36 #define EVENT 5
37
38 static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
39 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
40
41 static struct afb_ws_itf wsj1_itf = {
42         .on_hangup = (void*)wsj1_on_hangup,
43         .on_text = (void*)wsj1_on_text
44 };
45
46 struct wsj1_call
47 {
48         struct wsj1_call *next;
49         void (*callback)(void *, struct afb_wsj1_msg *);
50         void *closure;
51         char id[16];
52 };
53
54 struct afb_wsj1_msg
55 {
56         int refcount;
57         struct afb_wsj1 *wsj1;
58         struct afb_wsj1_msg *next, *previous;
59         char *text;
60         int code;
61         char *id;
62         char *api;
63         char *verb;
64         char *event;
65         char *object_s;
66         size_t object_s_length;
67         char *token;
68         struct json_object *object_j;
69 };
70
71 struct afb_wsj1
72 {
73         int refcount;
74         int genid;
75         struct afb_wsj1_itf *itf;
76         void *closure;
77         struct json_tokener *tokener;
78         struct afb_ws *ws;
79         struct afb_wsj1_msg *messages;
80         struct wsj1_call *calls;
81         pthread_mutex_t mutex;
82 };
83
84 struct afb_wsj1 *afb_wsj1_create(struct sd_event *eloop, int fd, struct afb_wsj1_itf *itf, void *closure)
85 {
86         struct afb_wsj1 *result;
87
88         assert(fd >= 0);
89
90         result = calloc(1, sizeof * result);
91         if (result == NULL)
92                 goto error;
93
94         result->refcount = 1;
95         result->itf = itf;
96         result->closure = closure;
97         pthread_mutex_init(&result->mutex, NULL);
98
99         result->tokener = json_tokener_new();
100         if (result->tokener == NULL)
101                 goto error2;
102
103         result->ws = afb_ws_create(eloop, fd, &wsj1_itf, result);
104         if (result->ws == NULL)
105                 goto error3;
106
107         return result;
108
109 error3:
110         json_tokener_free(result->tokener);
111 error2:
112         free(result);
113 error:
114         close(fd);
115         return NULL;
116 }
117
118 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
119 {
120         if (wsj1)
121                 __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
122 }
123
124 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
125 {
126         if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
127                 afb_ws_destroy(wsj1->ws);
128                 json_tokener_free(wsj1->tokener);
129                 free(wsj1);
130         }
131 }
132
133 static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
134 {
135         if (wsj1->itf->on_hangup != NULL)
136                 wsj1->itf->on_hangup(wsj1->closure, wsj1);
137 }
138
139
140 static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
141 {
142         struct wsj1_call *r, **p;
143
144         p = &wsj1->calls;
145         while((r = *p) != NULL) {
146                 if (strcmp(r->id, id) == 0) {
147                         if (remove)
148                                 *p = r->next;
149                         break;
150                 }
151                 p = &r->next;
152         }
153
154         return r;
155 }
156
157 static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
158 {
159         struct wsj1_call *r;
160
161         pthread_mutex_lock(&wsj1->mutex);
162         r = wsj1_locked_call_search(wsj1, id, remove);
163         pthread_mutex_unlock(&wsj1->mutex);
164
165         return r;
166 }
167
168 static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply)(void*,struct afb_wsj1_msg*), void *closure)
169 {
170         struct wsj1_call *call = malloc(sizeof *call);
171         if (call == NULL)
172                 errno = ENOMEM;
173         else {
174                 pthread_mutex_lock(&wsj1->mutex);
175                 do {
176                         if (wsj1->genid == 0)
177                                 wsj1->genid = 999999;
178                         sprintf(call->id, "%d", wsj1->genid--);
179                 } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
180                 call->callback = on_reply;
181                 call->closure = closure;
182                 call->next = wsj1->calls;
183                 wsj1->calls = call;
184                 pthread_mutex_unlock(&wsj1->mutex);
185         }
186         return call;
187 }
188
189
190 static int wsj1_msg_scan(char *text, size_t items[10][2])
191 {
192         char *pos, *beg, *end, c;
193         int aux, n = 0;
194
195         /* scan */
196         pos = text;
197
198         /* scans: [ */
199         while(*pos == ' ') pos++;
200         if (*pos++ != '[') goto bad_scan;
201
202         /* scans list */
203         while(*pos == ' ') pos++;
204         if (*pos != ']') {
205                 for (;;) {
206                         if (n == 10)
207                                 goto bad_scan;
208                         beg = pos;
209                         aux = 0;
210                         while (aux != 0 || (*pos != ',' && *pos != ']')) {
211                                 switch(*pos++) {
212                                 case '{': case '[': aux++; break;
213                                 case '}': case ']': if (aux--) break;
214                                 case 0: goto bad_scan;
215                                 case '"':
216                                         do {
217                                                 switch(c = *pos++) {
218                                                 case '\\': if (*pos++) break;
219                                                 case 0: goto bad_scan;
220                                                 }
221                                         } while(c != '"');
222                                 }
223                         }
224                         end = pos;
225                         while (end > beg && end[-1] == ' ')
226                                 end--;
227                         items[n][0] = beg - text; /* start offset */
228                         items[n][1] = end - beg;  /* length */
229                         n++;
230                         if (*pos == ']')
231                                 break;
232                         while(*++pos == ' ');
233                 }
234         }
235         while(*++pos == ' ');
236         if (*pos) goto bad_scan;
237         return n;
238
239 bad_scan:
240         return -1;
241 }
242
243 static char *wsj1_msg_parse_extract(char *text, size_t offset, size_t size)
244 {
245         text[offset + size] = 0;
246         return text + offset;
247 }
248
249 static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
250 {
251         if (size > 1 && text[offset] == '"') {
252                 offset += 1;
253                 size -= 2;
254         }
255         return wsj1_msg_parse_extract(text, offset, size);
256 }
257
258 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
259 {
260         size_t items[10][2];
261         int n;
262         struct afb_wsj1_msg *msg;
263         struct wsj1_call *call = NULL;
264
265         /* allocate */
266         msg = calloc(1, sizeof *msg);
267         if (msg == NULL)
268                 goto alloc_error;
269
270         /* scan */
271         n = wsj1_msg_scan(text, items);
272         if (n < 0)
273                 goto bad_header;
274
275         /* scans code: 2|3|4|5 */
276         if (items[0][1] != 1) goto bad_header;
277         switch (text[items[0][0]]) {
278         case '2': msg->code = CALL; break;
279         case '3': msg->code = RETOK; break;
280         case '4': msg->code = RETERR; break;
281         case '5': msg->code = EVENT; break;
282         default: goto bad_header;
283         }
284
285         /* fills the message */
286         switch (msg->code) {
287         case CALL:
288                 if (n != 4 && n != 5) goto bad_header;
289                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
290                 msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
291                 msg->verb = strchr(msg->api, '/');
292                 if (msg->verb == NULL) goto bad_header;
293                 *msg->verb++ = 0;
294                 msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
295                 msg->object_s_length = items[3][1];
296                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
297                 break;
298         case RETOK:
299         case RETERR:
300                 if (n != 3 && n != 4) goto bad_header;
301                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
302                 call = wsj1_call_search(wsj1, msg->id, 1);
303                 if (call == NULL) goto bad_header;
304                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
305                 msg->object_s_length = items[2][1];
306                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
307                 break;
308         case EVENT:
309                 if (n != 3) goto bad_header;
310                 msg->event = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
311                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
312                 msg->object_s_length = items[2][1];
313                 break;
314         }
315         /* done */
316         msg->text = text;
317
318         /* fill and record the request */
319         msg->refcount = 1;
320         afb_wsj1_addref(wsj1);
321         msg->wsj1 = wsj1;
322         pthread_mutex_lock(&wsj1->mutex);
323         msg->next = wsj1->messages;
324         if (msg->next != NULL)
325                 msg->next->previous = msg;
326         wsj1->messages = msg;
327         pthread_mutex_unlock(&wsj1->mutex);
328
329         /* incoke the handler */
330         switch (msg->code) {
331         case CALL:
332                 wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
333                 break;
334         case RETOK:
335         case RETERR:
336                 call->callback(call->closure, msg);
337                 free(call);
338                 break;
339         case EVENT:
340                 wsj1->itf->on_event(wsj1->closure, msg->event, msg);
341                 break;
342         }
343         afb_wsj1_msg_unref(msg);
344         return;
345
346 bad_header:
347         free(msg);
348 alloc_error:
349         free(text);
350         afb_ws_close(wsj1->ws, 1008, NULL);
351 }
352
353 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
354 {
355         if (msg != NULL)
356                 __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
357 }
358
359 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
360 {
361         if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
362                 /* unlink the message */
363                 pthread_mutex_lock(&msg->wsj1->mutex);
364                 if (msg->next != NULL)
365                         msg->next->previous = msg->previous;
366                 if (msg->previous == NULL)
367                         msg->wsj1->messages = msg->next;
368                 else
369                         msg->previous->next = msg->next;
370                 pthread_mutex_unlock(&msg->wsj1->mutex);
371                 /* free ressources */
372                 afb_wsj1_unref(msg->wsj1);
373                 json_object_put(msg->object_j);
374                 free(msg->text);
375                 free(msg);
376         }
377 }
378
379 const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
380 {
381         return msg->object_s;
382 }
383
384 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
385 {
386         struct json_object *object = msg->object_j;
387         if (object == NULL) {
388                 pthread_mutex_lock(&msg->wsj1->mutex);
389                 json_tokener_reset(msg->wsj1->tokener);
390                 object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
391                 pthread_mutex_unlock(&msg->wsj1->mutex);
392                 if (object == NULL) {
393                         /* lazy error detection of json request. Is it to improve? */
394                         object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
395                 }
396                 msg->object_j = object;
397         }
398         return object;
399 }
400
401 int afb_wsj1_msg_is_call(struct afb_wsj1_msg *msg)
402 {
403         return msg->code == CALL;
404 }
405
406 int afb_wsj1_msg_is_reply(struct afb_wsj1_msg *msg)
407 {
408         return msg->code == RETOK || msg->code == RETERR;
409 }
410
411 int afb_wsj1_msg_is_reply_ok(struct afb_wsj1_msg *msg)
412 {
413         return msg->code == RETOK;
414 }
415
416 int afb_wsj1_msg_is_reply_error(struct afb_wsj1_msg *msg)
417 {
418         return msg->code == RETERR;
419 }
420
421 int afb_wsj1_msg_is_event(struct afb_wsj1_msg *msg)
422 {
423         return msg->code == EVENT;
424 }
425
426 const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg)
427 {
428         return msg->api;
429 }
430
431 const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg)
432 {
433         return msg->verb;
434 }
435
436 const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg)
437 {
438         return msg->event;
439 }
440
441 const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
442 {
443         return msg->token;
444 }
445
446 struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
447 {
448         return msg->wsj1;
449 }
450
451 int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
452 {
453         return afb_ws_close(wsj1->ws, code, text);
454 }
455
456 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
457 {
458         char code[2] = { (char)('0' + i1), 0 };
459         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
460 }
461
462 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
463 {
464         char code[2] = { (char)('0' + i1), 0 };
465         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
466 }
467
468 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
469 {
470         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
471         int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
472         json_object_put(object);
473         return rc;
474 }
475
476 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
477 {
478         return wsj1_send_isot(wsj1, EVENT, event, object, NULL);
479 }
480
481 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
482 {
483         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
484         int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
485         json_object_put(object);
486         return rc;
487 }
488
489 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
490 {
491         int rc;
492         struct wsj1_call *call;
493         char *tag;
494
495         /* allocates the call */
496         call = wsj1_call_create(wsj1, on_reply, closure);
497         if (call == NULL) {
498                 errno = ENOMEM;
499                 return -1;
500         }
501
502         /* makes the tag */
503         tag = alloca(2 + strlen(api) + strlen(verb));
504         stpcpy(stpcpy(stpcpy(tag, api), "/"), verb);
505
506         /* makes the call */
507         rc = wsj1_send_issot(wsj1, CALL, call->id, tag, object, NULL);
508         if (rc < 0) {
509                 wsj1_call_search(wsj1, call->id, 1);
510                 free(call);
511         }
512         return rc;
513 }
514
515 int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
516 {
517         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
518         int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
519         json_object_put(object);
520         return rc;
521 }
522
523 int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
524 {
525         return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
526 }
527