fdev: Introduce fdev for file event handling
[src/app-framework-binder.git] / src / afb-wsj1.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author: José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <pthread.h>
27
28 #include <json-c/json.h>
29
30 #include "afb-ws.h"
31 #include "afb-wsj1.h"
32 #include "fdev.h"
33
34 #define CALL 2
35 #define RETOK 3
36 #define RETERR 4
37 #define EVENT 5
38
39 static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
40 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
41
42 static struct afb_ws_itf wsj1_itf = {
43         .on_hangup = (void*)wsj1_on_hangup,
44         .on_text = (void*)wsj1_on_text
45 };
46
47 struct wsj1_call
48 {
49         struct wsj1_call *next;
50         void (*callback)(void *, struct afb_wsj1_msg *);
51         void *closure;
52         char id[16];
53 };
54
55 struct afb_wsj1_msg
56 {
57         int refcount;
58         struct afb_wsj1 *wsj1;
59         struct afb_wsj1_msg *next, *previous;
60         char *text;
61         int code;
62         char *id;
63         char *api;
64         char *verb;
65         char *event;
66         char *object_s;
67         size_t object_s_length;
68         char *token;
69         struct json_object *object_j;
70 };
71
72 struct afb_wsj1
73 {
74         int refcount;
75         int genid;
76         struct afb_wsj1_itf *itf;
77         void *closure;
78         struct json_tokener *tokener;
79         struct afb_ws *ws;
80         struct afb_wsj1_msg *messages;
81         struct wsj1_call *calls;
82         pthread_mutex_t mutex;
83 };
84
85 struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
86 {
87         struct afb_wsj1 *result;
88
89         assert(fdev);
90         assert(itf);
91         assert(itf->on_call);
92
93         result = calloc(1, sizeof * result);
94         if (result == NULL)
95                 goto error;
96
97         result->refcount = 1;
98         result->itf = itf;
99         result->closure = closure;
100         pthread_mutex_init(&result->mutex, NULL);
101
102         result->tokener = json_tokener_new();
103         if (result->tokener == NULL)
104                 goto error2;
105
106         result->ws = afb_ws_create(fdev, &wsj1_itf, result);
107         if (result->ws == NULL)
108                 goto error3;
109
110         return result;
111
112 error3:
113         json_tokener_free(result->tokener);
114 error2:
115         free(result);
116 error:
117         fdev_unref(fdev);
118         return NULL;
119 }
120
121 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
122 {
123         if (wsj1)
124                 __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
125 }
126
127 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
128 {
129         if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
130                 afb_ws_destroy(wsj1->ws);
131                 json_tokener_free(wsj1->tokener);
132                 free(wsj1);
133         }
134 }
135
136 static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
137 {
138         if (wsj1->itf->on_hangup != NULL)
139                 wsj1->itf->on_hangup(wsj1->closure, wsj1);
140 }
141
142
143 static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
144 {
145         struct wsj1_call *r, **p;
146
147         p = &wsj1->calls;
148         while((r = *p) != NULL) {
149                 if (strcmp(r->id, id) == 0) {
150                         if (remove)
151                                 *p = r->next;
152                         break;
153                 }
154                 p = &r->next;
155         }
156
157         return r;
158 }
159
160 static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
161 {
162         struct wsj1_call *r;
163
164         pthread_mutex_lock(&wsj1->mutex);
165         r = wsj1_locked_call_search(wsj1, id, remove);
166         pthread_mutex_unlock(&wsj1->mutex);
167
168         return r;
169 }
170
171 static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply)(void*,struct afb_wsj1_msg*), void *closure)
172 {
173         struct wsj1_call *call = malloc(sizeof *call);
174         if (call == NULL)
175                 errno = ENOMEM;
176         else {
177                 pthread_mutex_lock(&wsj1->mutex);
178                 do {
179                         if (wsj1->genid == 0)
180                                 wsj1->genid = 999999;
181                         sprintf(call->id, "%d", wsj1->genid--);
182                 } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
183                 call->callback = on_reply;
184                 call->closure = closure;
185                 call->next = wsj1->calls;
186                 wsj1->calls = call;
187                 pthread_mutex_unlock(&wsj1->mutex);
188         }
189         return call;
190 }
191
192
193 static int wsj1_msg_scan(char *text, size_t items[10][2])
194 {
195         char *pos, *beg, *end, c;
196         int aux, n = 0;
197
198         /* scan */
199         pos = text;
200
201         /* scans: [ */
202         while(*pos == ' ') pos++;
203         if (*pos++ != '[') goto bad_scan;
204
205         /* scans list */
206         while(*pos == ' ') pos++;
207         if (*pos != ']') {
208                 for (;;) {
209                         if (n == 10)
210                                 goto bad_scan;
211                         beg = pos;
212                         aux = 0;
213                         while (aux != 0 || (*pos != ',' && *pos != ']')) {
214                                 switch(*pos++) {
215                                 case '{': case '[': aux++; break;
216                                 case '}': case ']': if (aux--) break;
217                                 case 0: goto bad_scan;
218                                 case '"':
219                                         do {
220                                                 switch(c = *pos++) {
221                                                 case '\\': if (*pos++) break;
222                                                 case 0: goto bad_scan;
223                                                 }
224                                         } while(c != '"');
225                                 }
226                         }
227                         end = pos;
228                         while (end > beg && end[-1] == ' ')
229                                 end--;
230                         items[n][0] = beg - text; /* start offset */
231                         items[n][1] = end - beg;  /* length */
232                         n++;
233                         if (*pos == ']')
234                                 break;
235                         while(*++pos == ' ');
236                 }
237         }
238         while(*++pos == ' ');
239         if (*pos) goto bad_scan;
240         return n;
241
242 bad_scan:
243         return -1;
244 }
245
246 static char *wsj1_msg_parse_extract(char *text, size_t offset, size_t size)
247 {
248         text[offset + size] = 0;
249         return text + offset;
250 }
251
252 static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
253 {
254         if (size > 1 && text[offset] == '"') {
255                 offset += 1;
256                 size -= 2;
257         }
258         return wsj1_msg_parse_extract(text, offset, size);
259 }
260
261 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
262 {
263         size_t items[10][2];
264         int n;
265         struct afb_wsj1_msg *msg;
266         struct wsj1_call *call = NULL;
267
268         /* allocate */
269         msg = calloc(1, sizeof *msg);
270         if (msg == NULL)
271                 goto alloc_error;
272
273         /* scan */
274         n = wsj1_msg_scan(text, items);
275         if (n < 0)
276                 goto bad_header;
277
278         /* scans code: 2|3|4|5 */
279         if (items[0][1] != 1) goto bad_header;
280         switch (text[items[0][0]]) {
281         case '2': msg->code = CALL; break;
282         case '3': msg->code = RETOK; break;
283         case '4': msg->code = RETERR; break;
284         case '5': msg->code = EVENT; break;
285         default: goto bad_header;
286         }
287
288         /* fills the message */
289         switch (msg->code) {
290         case CALL:
291                 if (n != 4 && n != 5) goto bad_header;
292                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
293                 msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
294                 msg->verb = strchr(msg->api, '/');
295                 if (msg->verb == NULL) goto bad_header;
296                 *msg->verb++ = 0;
297                 msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
298                 msg->object_s_length = items[3][1];
299                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
300                 break;
301         case RETOK:
302         case RETERR:
303                 if (n != 3 && n != 4) goto bad_header;
304                 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
305                 call = wsj1_call_search(wsj1, msg->id, 1);
306                 if (call == NULL) goto bad_header;
307                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
308                 msg->object_s_length = items[2][1];
309                 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
310                 break;
311         case EVENT:
312                 if (n != 3) goto bad_header;
313                 msg->event = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
314                 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
315                 msg->object_s_length = items[2][1];
316                 break;
317         }
318         /* done */
319         msg->text = text;
320
321         /* fill and record the request */
322         msg->refcount = 1;
323         afb_wsj1_addref(wsj1);
324         msg->wsj1 = wsj1;
325         pthread_mutex_lock(&wsj1->mutex);
326         msg->next = wsj1->messages;
327         if (msg->next != NULL)
328                 msg->next->previous = msg;
329         wsj1->messages = msg;
330         pthread_mutex_unlock(&wsj1->mutex);
331
332         /* incoke the handler */
333         switch (msg->code) {
334         case CALL:
335                 wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
336                 break;
337         case RETOK:
338         case RETERR:
339                 call->callback(call->closure, msg);
340                 free(call);
341                 break;
342         case EVENT:
343                 if (wsj1->itf->on_event != NULL)
344                         wsj1->itf->on_event(wsj1->closure, msg->event, msg);
345                 break;
346         }
347         afb_wsj1_msg_unref(msg);
348         return;
349
350 bad_header:
351         free(msg);
352 alloc_error:
353         free(text);
354         afb_ws_close(wsj1->ws, 1008, NULL);
355 }
356
357 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
358 {
359         if (msg != NULL)
360                 __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
361 }
362
363 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
364 {
365         if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
366                 /* unlink the message */
367                 pthread_mutex_lock(&msg->wsj1->mutex);
368                 if (msg->next != NULL)
369                         msg->next->previous = msg->previous;
370                 if (msg->previous == NULL)
371                         msg->wsj1->messages = msg->next;
372                 else
373                         msg->previous->next = msg->next;
374                 pthread_mutex_unlock(&msg->wsj1->mutex);
375                 /* free ressources */
376                 afb_wsj1_unref(msg->wsj1);
377                 json_object_put(msg->object_j);
378                 free(msg->text);
379                 free(msg);
380         }
381 }
382
383 const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
384 {
385         return msg->object_s;
386 }
387
388 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
389 {
390         struct json_object *object = msg->object_j;
391         if (object == NULL) {
392                 pthread_mutex_lock(&msg->wsj1->mutex);
393                 json_tokener_reset(msg->wsj1->tokener);
394                 object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
395                 pthread_mutex_unlock(&msg->wsj1->mutex);
396                 if (object == NULL) {
397                         /* lazy error detection of json request. Is it to improve? */
398                         object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
399                 }
400                 msg->object_j = object;
401         }
402         return object;
403 }
404
405 int afb_wsj1_msg_is_call(struct afb_wsj1_msg *msg)
406 {
407         return msg->code == CALL;
408 }
409
410 int afb_wsj1_msg_is_reply(struct afb_wsj1_msg *msg)
411 {
412         return msg->code == RETOK || msg->code == RETERR;
413 }
414
415 int afb_wsj1_msg_is_reply_ok(struct afb_wsj1_msg *msg)
416 {
417         return msg->code == RETOK;
418 }
419
420 int afb_wsj1_msg_is_reply_error(struct afb_wsj1_msg *msg)
421 {
422         return msg->code == RETERR;
423 }
424
425 int afb_wsj1_msg_is_event(struct afb_wsj1_msg *msg)
426 {
427         return msg->code == EVENT;
428 }
429
430 const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg)
431 {
432         return msg->api;
433 }
434
435 const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg)
436 {
437         return msg->verb;
438 }
439
440 const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg)
441 {
442         return msg->event;
443 }
444
445 const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
446 {
447         return msg->token;
448 }
449
450 struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
451 {
452         return msg->wsj1;
453 }
454
455 int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
456 {
457         return afb_ws_close(wsj1->ws, code, text);
458 }
459
460 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
461 {
462         char code[2] = { (char)('0' + i1), 0 };
463         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
464 }
465
466 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
467 {
468         char code[2] = { (char)('0' + i1), 0 };
469         return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
470 }
471
472 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
473 {
474         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
475         int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
476         json_object_put(object);
477         return rc;
478 }
479
480 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
481 {
482         return wsj1_send_isot(wsj1, EVENT, event, object, NULL);
483 }
484
485 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
486 {
487         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
488         int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
489         json_object_put(object);
490         return rc;
491 }
492
493 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
494 {
495         int rc;
496         struct wsj1_call *call;
497         char *tag;
498
499         /* allocates the call */
500         call = wsj1_call_create(wsj1, on_reply, closure);
501         if (call == NULL) {
502                 errno = ENOMEM;
503                 return -1;
504         }
505
506         /* makes the tag */
507         tag = alloca(2 + strlen(api) + strlen(verb));
508         stpcpy(stpcpy(stpcpy(tag, api), "/"), verb);
509
510         /* makes the call */
511         rc = wsj1_send_issot(wsj1, CALL, call->id, tag, object, NULL);
512         if (rc < 0) {
513                 wsj1_call_search(wsj1, call->id, 1);
514                 free(call);
515         }
516         return rc;
517 }
518
519 int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
520 {
521         const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
522         int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
523         json_object_put(object);
524         return rc;
525 }
526
527 int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
528 {
529         return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
530 }
531