afb-wsj1: Split 'wsj1_on_text' in 2
[src/app-framework-binder.git] / src / afb-wsj1.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <pthread.h>
27
28 #include <json-c/json.h>
29 #if !defined(JSON_C_TO_STRING_NOSLASHESCAPE)
30 #define JSON_C_TO_STRING_NOSLASHESCAPE 0
31 #endif
32
33 #include "afb-ws.h"
34 #include "afb-wsj1.h"
35 #include "fdev.h"
36
37 #define CALL 2
38 #define RETOK 3
39 #define RETERR 4
40 #define EVENT 5
41
42 #define WEBSOCKET_CODE_POLICY_VIOLATION  1008
43 #define WEBSOCKET_CODE_INTERNAL_ERROR    1011
44
45 static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
46 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
47
48 static struct afb_ws_itf wsj1_itf = {
49         .on_hangup = (void*)wsj1_on_hangup,
50         .on_text = (void*)wsj1_on_text
51 };
52
53 struct wsj1_call
54 {
55         struct wsj1_call *next;
56         void (*callback)(void *, struct afb_wsj1_msg *);
57         void *closure;
58         char id[16];
59 };
60
61 struct afb_wsj1_msg
62 {
63         int refcount;
64         struct afb_wsj1 *wsj1;
65         struct afb_wsj1_msg *next, *previous;
66         char *text;
67         int code;
68         const char *id;
69         const char *api;
70         const char *verb;
71         const char *event;
72         const char *object_s;
73         size_t object_s_length;
74         const char *token;
75         struct json_object *object_j;
76 };
77
78 struct afb_wsj1
79 {
80         int refcount;
81         int genid;
82         struct afb_wsj1_itf *itf;
83         void *closure;
84         struct json_tokener *tokener;
85         struct afb_ws *ws;
86         struct afb_wsj1_msg *messages;
87         struct wsj1_call *calls;
88         pthread_mutex_t mutex;
89 };
90
91 struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
92 {
93         struct afb_wsj1 *result;
94
95         assert(fdev);
96         assert(itf);
97         assert(itf->on_call);
98
99         result = calloc(1, sizeof * result);
100         if (result == NULL)
101                 goto error;
102
103         result->refcount = 1;
104         result->itf = itf;
105         result->closure = closure;
106         pthread_mutex_init(&result->mutex, NULL);
107
108         result->tokener = json_tokener_new();
109         if (result->tokener == NULL)
110                 goto error2;
111
112         result->ws = afb_ws_create(fdev, &wsj1_itf, result);
113         if (result->ws == NULL)
114                 goto error3;
115
116         return result;
117
118 error3:
119         json_tokener_free(result->tokener);
120 error2:
121         free(result);
122 error:
123         fdev_unref(fdev);
124         return NULL;
125 }
126
127 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
128 {
129         if (wsj1)
130                 __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
131 }
132
133 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
134 {
135         if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
136                 afb_ws_destroy(wsj1->ws);
137                 json_tokener_free(wsj1->tokener);
138                 free(wsj1);
139         }
140 }
141
142 static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
143 {
144         if (wsj1->itf->on_hangup != NULL)
145                 wsj1->itf->on_hangup(wsj1->closure, wsj1);
146 }
147
148
149 static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
150 {
151         struct wsj1_call *r, **p;
152
153         p = &wsj1->calls;
154         while((r = *p) != NULL) {
155                 if (strcmp(r->id, id) == 0) {
156                         if (remove)
157                                 *p = r->next;
158                         break;
159                 }
160                 p = &r->next;
161         }
162
163         return r;
164 }
165
166 static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
167 {
168         struct wsj1_call *r;
169
170         pthread_mutex_lock(&wsj1->mutex);
171         r = wsj1_locked_call_search(wsj1, id, remove);
172         pthread_mutex_unlock(&wsj1->mutex);
173
174         return r;
175 }
176
177 static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply)(void*,struct afb_wsj1_msg*), void *closure)
178 {
179         struct wsj1_call *call = malloc(sizeof *call);
180         if (call == NULL)
181                 errno = ENOMEM;
182         else {
183                 pthread_mutex_lock(&wsj1->mutex);
184                 do {
185                         if (wsj1->genid == 0)
186                                 wsj1->genid = 999999;
187                         sprintf(call->id, "%d", wsj1->genid--);
188                 } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
189                 call->callback = on_reply;
190                 call->closure = closure;
191                 call->next = wsj1->calls;
192                 wsj1->calls = call;
193                 pthread_mutex_unlock(&wsj1->mutex);
194         }
195         return call;
196 }
197
198
199 static int wsj1_msg_scan(char *text, size_t items[10][2])
200 {
201         char *pos, *beg, *end, c;
202         int aux, n = 0;
203
204         /* scan */
205         pos = text;
206
207         /* scans: [ */
208         while(*pos == ' ') pos++;
209         if (*pos++ != '[') goto bad_scan;
210
211         /* scans list */
212         while(*pos == ' ') pos++;
213         if (*pos != ']') {
214                 for (;;) {
215                         if (n == 10)
216                                 goto bad_scan;
217                         beg = pos;
218                         aux = 0;
219                         while (aux != 0 || (*pos != ',' && *pos != ']')) {
220                                 switch(*pos++) {
221                                 case '{': case '[': aux++; break;
222                                 case '}': case ']': if (aux--) break;
223                                 case 0: goto bad_scan;
224                                 case '"':
225                                         do {
226                                                 switch(c = *pos++) {
227                                                 case '\\': if (*pos++) break;
228                                                 case 0: goto bad_scan;
229                                                 }
230                                         } while(c != '"');
231                                 }
232                         }
233                         end = pos;
234                         while (end > beg && end[-1] == ' ')
235                                 end--;
236                         items[n][0] = beg - text; /* start offset */
237                         items[n][1] = end - beg;  /* length */
238                         n++;
239                         if (*pos == ']')
240                                 break;
241                         while(*++pos == ' ');
242                 }
243         }
244         while(*++pos == ' ');
245         if (*pos) goto bad_scan;
246         return n;
247
248 bad_scan:
249         return -1;
250 }
251
252 static char *wsj1_msg_parse_extract(char *text, size_t offset, size_t size)
253 {
254         text[offset + size] = 0;
255         return text + offset;
256 }
257
258 static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
259 {
260         if (size > 1 && text[offset] == '"') {
261                 offset += 1;
262                 size -= 2;
263         }
264         return wsj1_msg_parse_extract(text, offset, size);
265 }
266
267 static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size)
268 {
269         size_t items[10][2];
270         int n;
271         struct afb_wsj1_msg *msg;
272         char *verb;
273
274         /* allocate */
275         msg = calloc(1, sizeof *msg);
276         if (msg == NULL) {
277                 errno = ENOMEM;
278                 goto alloc_error;
279         }
280
281         /* scan */
282         n = wsj1_msg_scan(text, items);
283         if (n < 0)
284                 goto bad_header;
285
286         /* scans code: 2|3|4|5 */
287         if (items[0][1] != 1) goto bad_header;
288         switch (text[items[0][0]]) {
289         case '2': msg->code = CALL; break;
290         case '3': msg->code = RETOK; break;
291         case '4': msg->code = RETERR; break;
292         case '5': msg->code = EVENT; break;
293         default: goto bad_header;
294         }
295
296         /* fills the message */
297         switch (msg->code) {
298         case CALL:
299                 if (n != 4 && n != 5) goto bad_header;
300                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
301                 msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
302                 verb = strchr(msg->api, '/');
303                 if (verb == NULL) goto bad_header;
304                 *verb++ = 0;
305                 if (!*verb || *verb == '/') goto bad_header;
306                 msg->verb = verb;
307                 msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
308                 msg->object_s_length = items[3][1];
309                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
310                 break;
311         case RETOK:
312         case RETERR:
313                 if (n != 3 && n != 4) goto bad_header;
314                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
315                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
316                 msg->object_s_length = items[2][1];
317                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
318                 break;
319         case EVENT:
320                 if (n != 3) goto bad_header;
321                 msg->event = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
322                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
323                 msg->object_s_length = items[2][1];
324                 break;
325         }
326         /* done */
327         msg->text = text;
328
329         /* fill and record the request */
330         msg->refcount = 1;
331         afb_wsj1_addref(wsj1);
332         msg->wsj1 = wsj1;
333         pthread_mutex_lock(&wsj1->mutex);
334         msg->next = wsj1->messages;
335         if (msg->next != NULL)
336                 msg->next->previous = msg;
337         wsj1->messages = msg;
338         pthread_mutex_unlock(&wsj1->mutex);
339
340         return msg;
341
342 bad_header:
343         errno = EBADMSG;
344         free(msg);
345
346 alloc_error:
347         free(text);
348         return NULL;
349 }
350
351 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
352 {
353         struct wsj1_call *call;
354         struct afb_wsj1_msg *msg;
355
356         /* allocate */
357         msg = wsj1_msg_make(wsj1, text, size);
358         if (msg == NULL) {
359                 afb_ws_close(wsj1->ws, errno == EBADMSG
360                         ? WEBSOCKET_CODE_POLICY_VIOLATION
361                         : WEBSOCKET_CODE_INTERNAL_ERROR, NULL);
362                 return;
363         }
364
365         /* handle the message */
366         switch (msg->code) {
367         case CALL:
368                 wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
369                 break;
370         case RETOK:
371         case RETERR:
372                 call = wsj1_call_search(wsj1, msg->id, 1);
373                 if (call == NULL)
374                         afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL);
375                 else
376                         call->callback(call->closure, msg);
377                 free(call);
378                 break;
379         case EVENT:
380                 if (wsj1->itf->on_event != NULL)
381                         wsj1->itf->on_event(wsj1->closure, msg->event, msg);
382                 break;
383         }
384         afb_wsj1_msg_unref(msg);
385 }
386
387 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
388 {
389         if (msg != NULL)
390                 __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
391 }
392
393 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
394 {
395         if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
396                 /* unlink the message */
397                 pthread_mutex_lock(&msg->wsj1->mutex);
398                 if (msg->next != NULL)
399                         msg->next->previous = msg->previous;
400                 if (msg->previous == NULL)
401                         msg->wsj1->messages = msg->next;
402                 else
403                         msg->previous->next = msg->next;
404                 pthread_mutex_unlock(&msg->wsj1->mutex);
405                 /* free ressources */
406                 afb_wsj1_unref(msg->wsj1);
407                 json_object_put(msg->object_j);
408                 free(msg->text);
409                 free(msg);
410         }
411 }
412
413 const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
414 {
415         return msg->object_s;
416 }
417
418 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
419 {
420         enum json_tokener_error jerr;
421         struct json_object *object = msg->object_j;
422         if (object == NULL) {
423                 pthread_mutex_lock(&msg->wsj1->mutex);
424                 json_tokener_reset(msg->wsj1->tokener);
425                 object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length);
426                 jerr = json_tokener_get_error(msg->wsj1->tokener);
427                 pthread_mutex_unlock(&msg->wsj1->mutex);
428                 if (jerr != json_tokener_success) {
429                         /* lazy error detection of json request. Is it to improve? */
430                         object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
431                 }
432                 msg->object_j = object;
433         }
434         return object;
435 }
436
437 int afb_wsj1_msg_is_call(struct afb_wsj1_msg *msg)
438 {
439         return msg->code == CALL;
440 }
441
442 int afb_wsj1_msg_is_reply(struct afb_wsj1_msg *msg)
443 {
444         return msg->code == RETOK || msg->code == RETERR;
445 }
446
447 int afb_wsj1_msg_is_reply_ok(struct afb_wsj1_msg *msg)
448 {
449         return msg->code == RETOK;
450 }
451
452 int afb_wsj1_msg_is_reply_error(struct afb_wsj1_msg *msg)
453 {
454         return msg->code == RETERR;
455 }
456
457 int afb_wsj1_msg_is_event(struct afb_wsj1_msg *msg)
458 {
459         return msg->code == EVENT;
460 }
461
462 const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg)
463 {
464         return msg->api;
465 }
466
467 const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg)
468 {
469         return msg->verb;
470 }
471
472 const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg)
473 {
474         return msg->event;
475 }
476
477 const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
478 {
479         return msg->token;
480 }
481
482 struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
483 {
484         return msg->wsj1;
485 }
486
487 int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
488 {
489         return afb_ws_close(wsj1->ws, code, text);
490 }
491
492 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
493 {
494         char code[2] = { (char)('0' + i1), 0 };
495         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
496 }
497
498 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
499 {
500         char code[2] = { (char)('0' + i1), 0 };
501         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
502 }
503
504 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
505 {
506         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
507         int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
508         json_object_put(object);
509         return rc;
510 }
511
512 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
513 {
514         return wsj1_send_isot(wsj1, EVENT, event, object, NULL);
515 }
516
517 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
518 {
519         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
520         int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
521         json_object_put(object);
522         return rc;
523 }
524
525 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
526 {
527         int rc;
528         struct wsj1_call *call;
529         char *tag;
530
531         /* allocates the call */
532         call = wsj1_call_create(wsj1, on_reply, closure);
533         if (call == NULL) {
534                 errno = ENOMEM;
535                 return -1;
536         }
537
538         /* makes the tag */
539         tag = alloca(2 + strlen(api) + strlen(verb));
540         stpcpy(stpcpy(stpcpy(tag, api), "/"), verb);
541
542         /* makes the call */
543         rc = wsj1_send_issot(wsj1, CALL, call->id, tag, object, NULL);
544         if (rc < 0) {
545                 wsj1_call_search(wsj1, call->id, 1);
546                 free(call);
547         }
548         return rc;
549 }
550
551 int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
552 {
553         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
554         int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
555         json_object_put(object);
556         return rc;
557 }
558
559 int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
560 {
561         return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
562 }
563