4080edf27488664aa70b865e887b0369f9d598c3
[src/app-framework-binder.git] / src / afb-wsj1.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <pthread.h>
27
28 #include <json-c/json.h>
29 #if !defined(JSON_C_TO_STRING_NOSLASHESCAPE)
30 #define JSON_C_TO_STRING_NOSLASHESCAPE 0
31 #endif
32
33 #include "afb-ws.h"
34 #include "afb-wsj1.h"
35 #include "fdev.h"
36
37 #define CALL 2
38 #define RETOK 3
39 #define RETERR 4
40 #define EVENT 5
41
42 #define WEBSOCKET_CODE_POLICY_VIOLATION  1008
43 #define WEBSOCKET_CODE_INTERNAL_ERROR    1011
44
45 static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
46 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
47 static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size);
48
49 static struct afb_ws_itf wsj1_itf = {
50         .on_hangup = (void*)wsj1_on_hangup,
51         .on_text = (void*)wsj1_on_text
52 };
53
54 struct wsj1_call
55 {
56         struct wsj1_call *next;
57         void (*callback)(void *, struct afb_wsj1_msg *);
58         void *closure;
59         char id[16];
60 };
61
62 struct afb_wsj1_msg
63 {
64         int refcount;
65         struct afb_wsj1 *wsj1;
66         struct afb_wsj1_msg *next, *previous;
67         char *text;
68         int code;
69         const char *id;
70         const char *api;
71         const char *verb;
72         const char *event;
73         const char *object_s;
74         size_t object_s_length;
75         const char *token;
76         struct json_object *object_j;
77 };
78
79 struct afb_wsj1
80 {
81         int refcount;
82         int genid;
83         struct afb_wsj1_itf *itf;
84         void *closure;
85         struct json_tokener *tokener;
86         struct afb_ws *ws;
87         struct afb_wsj1_msg *messages;
88         struct wsj1_call *calls;
89         pthread_mutex_t mutex;
90 };
91
92 struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
93 {
94         struct afb_wsj1 *result;
95
96         assert(fdev);
97         assert(itf);
98         assert(itf->on_call);
99
100         result = calloc(1, sizeof * result);
101         if (result == NULL)
102                 goto error;
103
104         result->refcount = 1;
105         result->itf = itf;
106         result->closure = closure;
107         pthread_mutex_init(&result->mutex, NULL);
108
109         result->tokener = json_tokener_new();
110         if (result->tokener == NULL)
111                 goto error2;
112
113         result->ws = afb_ws_create(fdev, &wsj1_itf, result);
114         if (result->ws == NULL)
115                 goto error3;
116
117         return result;
118
119 error3:
120         json_tokener_free(result->tokener);
121 error2:
122         free(result);
123 error:
124         fdev_unref(fdev);
125         return NULL;
126 }
127
128 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
129 {
130         if (wsj1)
131                 __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
132 }
133
134 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
135 {
136         if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
137                 afb_ws_destroy(wsj1->ws);
138                 json_tokener_free(wsj1->tokener);
139                 free(wsj1);
140         }
141 }
142
143 static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
144 {
145         struct wsj1_call *call, *ncall;
146         struct afb_wsj1_msg *msg;
147         char *text;
148         int len;
149
150         static const char error_object_str[] = "{"
151                 "\"jtype\":\"afb-reply\","
152                 "\"request\":{"
153                         "\"status\":\"disconnected\","
154                         "\"info\":\"server hung up\"}}";
155
156         ncall = __atomic_exchange_n(&wsj1->calls, NULL, __ATOMIC_RELAXED);
157         while (ncall) {
158                 call = ncall;
159                 ncall = call->next;
160                 len = asprintf(&text, "[%d,\"%s\",%s]", RETERR, call->id, error_object_str);
161                 if (len > 0) {
162                         msg = wsj1_msg_make(wsj1, text, (size_t)len);
163                         if (msg != NULL) {
164                                 call->callback(call->closure, msg);
165                                 afb_wsj1_msg_unref(msg);
166                         }
167                 }
168                 free(call);
169         }
170
171         if (wsj1->itf->on_hangup != NULL)
172                 wsj1->itf->on_hangup(wsj1->closure, wsj1);
173 }
174
175
176 static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
177 {
178         struct wsj1_call *r, **p;
179
180         p = &wsj1->calls;
181         while((r = *p) != NULL) {
182                 if (strcmp(r->id, id) == 0) {
183                         if (remove)
184                                 *p = r->next;
185                         break;
186                 }
187                 p = &r->next;
188         }
189
190         return r;
191 }
192
193 static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
194 {
195         struct wsj1_call *r;
196
197         pthread_mutex_lock(&wsj1->mutex);
198         r = wsj1_locked_call_search(wsj1, id, remove);
199         pthread_mutex_unlock(&wsj1->mutex);
200
201         return r;
202 }
203
204 static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply)(void*,struct afb_wsj1_msg*), void *closure)
205 {
206         struct wsj1_call *call = malloc(sizeof *call);
207         if (call == NULL)
208                 errno = ENOMEM;
209         else {
210                 pthread_mutex_lock(&wsj1->mutex);
211                 do {
212                         if (wsj1->genid == 0)
213                                 wsj1->genid = 999999;
214                         sprintf(call->id, "%d", wsj1->genid--);
215                 } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
216                 call->callback = on_reply;
217                 call->closure = closure;
218                 call->next = wsj1->calls;
219                 wsj1->calls = call;
220                 pthread_mutex_unlock(&wsj1->mutex);
221         }
222         return call;
223 }
224
225
226 static int wsj1_msg_scan(char *text, size_t items[10][2])
227 {
228         char *pos, *beg, *end, c;
229         int aux, n = 0;
230
231         /* scan */
232         pos = text;
233
234         /* scans: [ */
235         while(*pos == ' ') pos++;
236         if (*pos++ != '[') goto bad_scan;
237
238         /* scans list */
239         while(*pos == ' ') pos++;
240         if (*pos != ']') {
241                 for (;;) {
242                         if (n == 10)
243                                 goto bad_scan;
244                         beg = pos;
245                         aux = 0;
246                         while (aux != 0 || (*pos != ',' && *pos != ']')) {
247                                 switch(*pos++) {
248                                 case '{': case '[': aux++; break;
249                                 case '}': case ']': if (aux--) break;
250                                 case 0: goto bad_scan;
251                                 case '"':
252                                         do {
253                                                 switch(c = *pos++) {
254                                                 case '\\': if (*pos++) break;
255                                                 case 0: goto bad_scan;
256                                                 }
257                                         } while(c != '"');
258                                 }
259                         }
260                         end = pos;
261                         while (end > beg && end[-1] == ' ')
262                                 end--;
263                         items[n][0] = beg - text; /* start offset */
264                         items[n][1] = end - beg;  /* length */
265                         n++;
266                         if (*pos == ']')
267                                 break;
268                         while(*++pos == ' ');
269                 }
270         }
271         while(*++pos == ' ');
272         if (*pos) goto bad_scan;
273         return n;
274
275 bad_scan:
276         return -1;
277 }
278
279 static char *wsj1_msg_parse_extract(char *text, size_t offset, size_t size)
280 {
281         text[offset + size] = 0;
282         return text + offset;
283 }
284
285 static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
286 {
287         if (size > 1 && text[offset] == '"') {
288                 offset += 1;
289                 size -= 2;
290         }
291         return wsj1_msg_parse_extract(text, offset, size);
292 }
293
294 static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size)
295 {
296         size_t items[10][2];
297         int n;
298         struct afb_wsj1_msg *msg;
299         char *verb;
300
301         /* allocate */
302         msg = calloc(1, sizeof *msg);
303         if (msg == NULL) {
304                 errno = ENOMEM;
305                 goto alloc_error;
306         }
307
308         /* scan */
309         n = wsj1_msg_scan(text, items);
310         if (n <= 0)
311                 goto bad_header;
312
313         /* scans code: 2|3|4|5 */
314         if (items[0][1] != 1)
315                 goto bad_header;
316
317         switch (text[items[0][0]]) {
318         case '2': msg->code = CALL; break;
319         case '3': msg->code = RETOK; break;
320         case '4': msg->code = RETERR; break;
321         case '5': msg->code = EVENT; break;
322         default: goto bad_header;
323         }
324
325         /* fills the message */
326         switch (msg->code) {
327         case CALL:
328                 if (n != 4 && n != 5) goto bad_header;
329                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
330                 msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
331                 verb = strchr(msg->api, '/');
332                 if (verb == NULL) goto bad_header;
333                 *verb++ = 0;
334                 if (!*verb || *verb == '/') goto bad_header;
335                 msg->verb = verb;
336                 msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
337                 msg->object_s_length = items[3][1];
338                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
339                 break;
340         case RETOK:
341         case RETERR:
342                 if (n != 3 && n != 4) goto bad_header;
343                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
344                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
345                 msg->object_s_length = items[2][1];
346                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
347                 break;
348         case EVENT:
349                 if (n != 3) goto bad_header;
350                 msg->event = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
351                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
352                 msg->object_s_length = items[2][1];
353                 break;
354         }
355         /* done */
356         msg->text = text;
357
358         /* fill and record the request */
359         msg->refcount = 1;
360         afb_wsj1_addref(wsj1);
361         msg->wsj1 = wsj1;
362         pthread_mutex_lock(&wsj1->mutex);
363         msg->next = wsj1->messages;
364         if (msg->next != NULL)
365                 msg->next->previous = msg;
366         wsj1->messages = msg;
367         pthread_mutex_unlock(&wsj1->mutex);
368
369         return msg;
370
371 bad_header:
372         errno = EBADMSG;
373         free(msg);
374
375 alloc_error:
376         free(text);
377         return NULL;
378 }
379
380 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
381 {
382         struct wsj1_call *call;
383         struct afb_wsj1_msg *msg;
384
385         /* allocate */
386         msg = wsj1_msg_make(wsj1, text, size);
387         if (msg == NULL) {
388                 afb_ws_close(wsj1->ws, errno == EBADMSG
389                         ? WEBSOCKET_CODE_POLICY_VIOLATION
390                         : WEBSOCKET_CODE_INTERNAL_ERROR, NULL);
391                 return;
392         }
393
394         /* handle the message */
395         switch (msg->code) {
396         case CALL:
397                 wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
398                 break;
399         case RETOK:
400         case RETERR:
401                 call = wsj1_call_search(wsj1, msg->id, 1);
402                 if (call == NULL)
403                         afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL);
404                 else
405                         call->callback(call->closure, msg);
406                 free(call);
407                 break;
408         case EVENT:
409                 if (wsj1->itf->on_event != NULL)
410                         wsj1->itf->on_event(wsj1->closure, msg->event, msg);
411                 break;
412         }
413         afb_wsj1_msg_unref(msg);
414 }
415
416 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
417 {
418         if (msg != NULL)
419                 __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
420 }
421
422 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
423 {
424         if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
425                 /* unlink the message */
426                 pthread_mutex_lock(&msg->wsj1->mutex);
427                 if (msg->next != NULL)
428                         msg->next->previous = msg->previous;
429                 if (msg->previous == NULL)
430                         msg->wsj1->messages = msg->next;
431                 else
432                         msg->previous->next = msg->next;
433                 pthread_mutex_unlock(&msg->wsj1->mutex);
434                 /* free ressources */
435                 afb_wsj1_unref(msg->wsj1);
436                 json_object_put(msg->object_j);
437                 free(msg->text);
438                 free(msg);
439         }
440 }
441
442 const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
443 {
444         return msg->object_s;
445 }
446
447 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
448 {
449         enum json_tokener_error jerr;
450         struct json_object *object = msg->object_j;
451         if (object == NULL) {
452                 pthread_mutex_lock(&msg->wsj1->mutex);
453                 json_tokener_reset(msg->wsj1->tokener);
454                 object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length);
455                 jerr = json_tokener_get_error(msg->wsj1->tokener);
456                 pthread_mutex_unlock(&msg->wsj1->mutex);
457                 if (jerr != json_tokener_success) {
458                         /* lazy error detection of json request. Is it to improve? */
459                         object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
460                 }
461                 msg->object_j = object;
462         }
463         return object;
464 }
465
466 int afb_wsj1_msg_is_call(struct afb_wsj1_msg *msg)
467 {
468         return msg->code == CALL;
469 }
470
471 int afb_wsj1_msg_is_reply(struct afb_wsj1_msg *msg)
472 {
473         return msg->code == RETOK || msg->code == RETERR;
474 }
475
476 int afb_wsj1_msg_is_reply_ok(struct afb_wsj1_msg *msg)
477 {
478         return msg->code == RETOK;
479 }
480
481 int afb_wsj1_msg_is_reply_error(struct afb_wsj1_msg *msg)
482 {
483         return msg->code == RETERR;
484 }
485
486 int afb_wsj1_msg_is_event(struct afb_wsj1_msg *msg)
487 {
488         return msg->code == EVENT;
489 }
490
491 const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg)
492 {
493         return msg->api;
494 }
495
496 const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg)
497 {
498         return msg->verb;
499 }
500
501 const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg)
502 {
503         return msg->event;
504 }
505
506 const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
507 {
508         return msg->token;
509 }
510
511 struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
512 {
513         return msg->wsj1;
514 }
515
516 int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
517 {
518         return afb_ws_close(wsj1->ws, code, text);
519 }
520
521 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
522 {
523         char code[2] = { (char)('0' + i1), 0 };
524         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
525 }
526
527 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
528 {
529         char code[2] = { (char)('0' + i1), 0 };
530         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
531 }
532
533 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
534 {
535         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
536         int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
537         json_object_put(object);
538         return rc;
539 }
540
541 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
542 {
543         return wsj1_send_isot(wsj1, EVENT, event, object, NULL);
544 }
545
546 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
547 {
548         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
549         int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
550         json_object_put(object);
551         return rc;
552 }
553
554 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
555 {
556         int rc;
557         struct wsj1_call *call;
558         char *tag;
559
560         /* allocates the call */
561         call = wsj1_call_create(wsj1, on_reply, closure);
562         if (call == NULL) {
563                 errno = ENOMEM;
564                 return -1;
565         }
566
567         /* makes the tag */
568         tag = alloca(2 + strlen(api) + strlen(verb));
569         stpcpy(stpcpy(stpcpy(tag, api), "/"), verb);
570
571         /* makes the call */
572         rc = wsj1_send_issot(wsj1, CALL, call->id, tag, object, NULL);
573         if (rc < 0) {
574                 wsj1_call_search(wsj1, call->id, 1);
575                 free(call);
576         }
577         return rc;
578 }
579
580 int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
581 {
582         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
583         int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
584         json_object_put(object);
585         return rc;
586 }
587
588 int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
589 {
590         return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
591 }
592