2 * Copyright (C) 2016-2019 "IoT.bzh"
3 * Author: José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 #include <json-c/json.h>
29 #if !defined(JSON_C_TO_STRING_NOSLASHESCAPE)
30 #define JSON_C_TO_STRING_NOSLASHESCAPE 0
42 #define WEBSOCKET_CODE_POLICY_VIOLATION 1008
43 #define WEBSOCKET_CODE_INTERNAL_ERROR 1011
45 static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
46 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
47 static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size);
49 static struct afb_ws_itf wsj1_itf = {
50 .on_hangup = (void*)wsj1_on_hangup,
51 .on_text = (void*)wsj1_on_text
56 struct wsj1_call *next;
57 void (*callback)(void *, struct afb_wsj1_msg *);
65 struct afb_wsj1 *wsj1;
66 struct afb_wsj1_msg *next, *previous;
74 size_t object_s_length;
76 struct json_object *object_j;
83 struct afb_wsj1_itf *itf;
85 struct json_tokener *tokener;
87 struct afb_wsj1_msg *messages;
88 struct wsj1_call *calls;
89 pthread_mutex_t mutex;
92 struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
94 struct afb_wsj1 *result;
100 result = calloc(1, sizeof * result);
104 result->refcount = 1;
106 result->closure = closure;
107 pthread_mutex_init(&result->mutex, NULL);
109 result->tokener = json_tokener_new();
110 if (result->tokener == NULL)
113 result->ws = afb_ws_create(fdev, &wsj1_itf, result);
114 if (result->ws == NULL)
120 json_tokener_free(result->tokener);
128 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
131 __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
134 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
136 if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
137 afb_ws_destroy(wsj1->ws);
138 json_tokener_free(wsj1->tokener);
143 static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
145 struct wsj1_call *call, *ncall;
146 struct afb_wsj1_msg *msg;
150 static const char error_object_str[] = "{"
151 "\"jtype\":\"afb-reply\","
153 "\"status\":\"disconnected\","
154 "\"info\":\"server hung up\"}}";
156 ncall = __atomic_exchange_n(&wsj1->calls, NULL, __ATOMIC_RELAXED);
160 len = asprintf(&text, "[%d,\"%s\",%s]", RETERR, call->id, error_object_str);
162 msg = wsj1_msg_make(wsj1, text, (size_t)len);
164 call->callback(call->closure, msg);
165 afb_wsj1_msg_unref(msg);
171 if (wsj1->itf->on_hangup != NULL)
172 wsj1->itf->on_hangup(wsj1->closure, wsj1);
176 static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
178 struct wsj1_call *r, **p;
181 while((r = *p) != NULL) {
182 if (strcmp(r->id, id) == 0) {
193 static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
197 pthread_mutex_lock(&wsj1->mutex);
198 r = wsj1_locked_call_search(wsj1, id, remove);
199 pthread_mutex_unlock(&wsj1->mutex);
204 static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply)(void*,struct afb_wsj1_msg*), void *closure)
206 struct wsj1_call *call = malloc(sizeof *call);
210 pthread_mutex_lock(&wsj1->mutex);
212 if (wsj1->genid == 0)
213 wsj1->genid = 999999;
214 sprintf(call->id, "%d", wsj1->genid--);
215 } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
216 call->callback = on_reply;
217 call->closure = closure;
218 call->next = wsj1->calls;
220 pthread_mutex_unlock(&wsj1->mutex);
226 static int wsj1_msg_scan(char *text, size_t items[10][2])
228 char *pos, *beg, *end, c;
235 while(*pos == ' ') pos++;
236 if (*pos++ != '[') goto bad_scan;
239 while(*pos == ' ') pos++;
246 while (aux != 0 || (*pos != ',' && *pos != ']')) {
248 case '{': case '[': aux++; break;
249 case '}': case ']': if (aux--) break;
250 case 0: goto bad_scan;
254 case '\\': if (*pos++) break;
255 case 0: goto bad_scan;
261 while (end > beg && end[-1] == ' ')
263 items[n][0] = beg - text; /* start offset */
264 items[n][1] = end - beg; /* length */
268 while(*++pos == ' ');
271 while(*++pos == ' ');
272 if (*pos) goto bad_scan;
279 static char *wsj1_msg_parse_extract(char *text, size_t offset, size_t size)
281 text[offset + size] = 0;
282 return text + offset;
285 static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
287 if (size > 1 && text[offset] == '"') {
291 return wsj1_msg_parse_extract(text, offset, size);
294 static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size)
298 struct afb_wsj1_msg *msg;
302 msg = calloc(1, sizeof *msg);
309 n = wsj1_msg_scan(text, items);
313 /* scans code: 2|3|4|5 */
314 if (items[0][1] != 1) goto bad_header;
315 switch (text[items[0][0]]) {
316 case '2': msg->code = CALL; break;
317 case '3': msg->code = RETOK; break;
318 case '4': msg->code = RETERR; break;
319 case '5': msg->code = EVENT; break;
320 default: goto bad_header;
323 /* fills the message */
326 if (n != 4 && n != 5) goto bad_header;
327 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
328 msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
329 verb = strchr(msg->api, '/');
330 if (verb == NULL) goto bad_header;
332 if (!*verb || *verb == '/') goto bad_header;
334 msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
335 msg->object_s_length = items[3][1];
336 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
340 if (n != 3 && n != 4) goto bad_header;
341 msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
342 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
343 msg->object_s_length = items[2][1];
344 msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
347 if (n != 3) goto bad_header;
348 msg->event = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
349 msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
350 msg->object_s_length = items[2][1];
356 /* fill and record the request */
358 afb_wsj1_addref(wsj1);
360 pthread_mutex_lock(&wsj1->mutex);
361 msg->next = wsj1->messages;
362 if (msg->next != NULL)
363 msg->next->previous = msg;
364 wsj1->messages = msg;
365 pthread_mutex_unlock(&wsj1->mutex);
378 static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
380 struct wsj1_call *call;
381 struct afb_wsj1_msg *msg;
384 msg = wsj1_msg_make(wsj1, text, size);
386 afb_ws_close(wsj1->ws, errno == EBADMSG
387 ? WEBSOCKET_CODE_POLICY_VIOLATION
388 : WEBSOCKET_CODE_INTERNAL_ERROR, NULL);
392 /* handle the message */
395 wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
399 call = wsj1_call_search(wsj1, msg->id, 1);
401 afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL);
403 call->callback(call->closure, msg);
407 if (wsj1->itf->on_event != NULL)
408 wsj1->itf->on_event(wsj1->closure, msg->event, msg);
411 afb_wsj1_msg_unref(msg);
414 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
417 __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
420 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
422 if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
423 /* unlink the message */
424 pthread_mutex_lock(&msg->wsj1->mutex);
425 if (msg->next != NULL)
426 msg->next->previous = msg->previous;
427 if (msg->previous == NULL)
428 msg->wsj1->messages = msg->next;
430 msg->previous->next = msg->next;
431 pthread_mutex_unlock(&msg->wsj1->mutex);
432 /* free ressources */
433 afb_wsj1_unref(msg->wsj1);
434 json_object_put(msg->object_j);
440 const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
442 return msg->object_s;
445 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
447 enum json_tokener_error jerr;
448 struct json_object *object = msg->object_j;
449 if (object == NULL) {
450 pthread_mutex_lock(&msg->wsj1->mutex);
451 json_tokener_reset(msg->wsj1->tokener);
452 object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length);
453 jerr = json_tokener_get_error(msg->wsj1->tokener);
454 pthread_mutex_unlock(&msg->wsj1->mutex);
455 if (jerr != json_tokener_success) {
456 /* lazy error detection of json request. Is it to improve? */
457 object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
459 msg->object_j = object;
464 int afb_wsj1_msg_is_call(struct afb_wsj1_msg *msg)
466 return msg->code == CALL;
469 int afb_wsj1_msg_is_reply(struct afb_wsj1_msg *msg)
471 return msg->code == RETOK || msg->code == RETERR;
474 int afb_wsj1_msg_is_reply_ok(struct afb_wsj1_msg *msg)
476 return msg->code == RETOK;
479 int afb_wsj1_msg_is_reply_error(struct afb_wsj1_msg *msg)
481 return msg->code == RETERR;
484 int afb_wsj1_msg_is_event(struct afb_wsj1_msg *msg)
486 return msg->code == EVENT;
489 const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg)
494 const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg)
499 const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg)
504 const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
509 struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
514 int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
516 return afb_ws_close(wsj1->ws, code, text);
519 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
521 char code[2] = { (char)('0' + i1), 0 };
522 return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
525 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
527 char code[2] = { (char)('0' + i1), 0 };
528 return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
531 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
533 const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
534 int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
535 json_object_put(object);
539 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
541 return wsj1_send_isot(wsj1, EVENT, event, object, NULL);
544 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
546 const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
547 int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
548 json_object_put(object);
552 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
555 struct wsj1_call *call;
558 /* allocates the call */
559 call = wsj1_call_create(wsj1, on_reply, closure);
566 tag = alloca(2 + strlen(api) + strlen(verb));
567 stpcpy(stpcpy(stpcpy(tag, api), "/"), verb);
570 rc = wsj1_send_issot(wsj1, CALL, call->id, tag, object, NULL);
572 wsj1_call_search(wsj1, call->id, 1);
578 int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
580 const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
581 int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
582 json_object_put(object);
586 int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
588 return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);