Simplify functions for calls
[src/app-framework-binder.git] / src / afb-api-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_PLUGIN_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <assert.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <endian.h>
28 #include <netdb.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32
33 #include <json-c/json.h>
34 #include <systemd/sd-event.h>
35
36 #include <afb/afb-req-itf.h>
37
38 #include "afb-common.h"
39
40 #include "afb-session.h"
41 #include "afb-ws.h"
42 #include "afb-msg-json.h"
43 #include "afb-apis.h"
44 #include "afb-api-so.h"
45 #include "afb-context.h"
46 #include "afb-evt.h"
47 #include "afb-subcall.h"
48 #include "verbose.h"
49 #include "sd-fds.h"
50
51 struct api_ws_memo;
52 struct api_ws_event;
53 struct api_ws_client;
54
55
56
57 /*
58  */
59 struct api_ws
60 {
61         char *path;             /* path of the object for the API */
62         char *api;              /* api name of the interface */
63         int fd;                 /* file descriptor */
64         union {
65                 struct {
66                         uint32_t id;
67                         struct afb_ws *ws;
68                         struct api_ws_event *events;
69                         struct api_ws_memo *memos;
70                 } client;
71                 struct {
72                         sd_event_source *listensrc;
73                         struct afb_evt_listener *listener; /* listener for broadcasted events */
74                 } server;
75         };
76 };
77
78 #define RETOK   1
79 #define RETERR  2
80 #define RETRAW  3
81
82 /******************* websocket interface for client part **********************************/
83
84 static void api_ws_client_on_binary(void *closure, char *data, size_t size);
85
86 static const struct afb_ws_itf api_ws_client_ws_itf =
87 {
88         .on_close = NULL,
89         .on_text = NULL,
90         .on_binary = api_ws_client_on_binary,
91         .on_error = NULL,
92         .on_hangup = NULL
93 };
94
95 /******************* event structures for server part **********************************/
96
97 static void api_ws_server_event_add(void *closure, const char *event, int eventid);
98 static void api_ws_server_event_remove(void *closure, const char *event, int eventid);
99 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
100 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
101
102 /* the interface for events pushing */
103 static const struct afb_evt_itf api_ws_server_evt_itf = {
104         .broadcast = api_ws_server_event_broadcast,
105         .push = api_ws_server_event_push,
106         .add = api_ws_server_event_add,
107         .remove = api_ws_server_event_remove
108 };
109
110 /******************* client description part for server *****************************/
111
112 struct api_ws_client
113 {
114         /* the server ws-api */
115         const char *api;
116
117         /* count of references */
118         int refcount;
119
120         /* listener for events */
121         struct afb_evt_listener *listener;
122
123         /* file descriptor */
124         int fd;
125
126         /* websocket */
127         struct afb_ws *ws;
128 };
129
130 /******************* websocket interface for client part **********************************/
131
132 static void api_ws_server_on_binary(void *closure, char *data, size_t size);
133 static void api_ws_server_on_hangup(void *closure);
134
135 static const struct afb_ws_itf api_ws_server_ws_itf =
136 {
137         .on_close = NULL,
138         .on_text = NULL,
139         .on_binary = api_ws_server_on_binary,
140         .on_error = NULL,
141         .on_hangup = api_ws_server_on_hangup
142 };
143
144 /******************* ws request part for server *****************/
145
146 /*
147  * structure for a ws request
148  */
149 struct api_ws_server_req {
150         struct afb_context context;     /* the context, should be THE FIRST */
151         struct api_ws_client *client;   /* the client of the request */
152         char *rcvdata;                  /* the received data to free */
153         struct json_object *json;       /* the readen request as object */
154         const char *request;            /* the readen request as string */
155         size_t lenreq;                  /* the length of the request */
156         int refcount;                   /* reference count of the request */
157         uint32_t msgid;                 /* the incoming request msgid */
158 };
159
160 static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq);
161 static void api_ws_server_req_unref(struct api_ws_server_req *wreq);
162
163 static struct json_object *api_ws_server_req_json_cb(void *closure);
164 static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name);
165 static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info);
166 static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info);
167 static const char *api_ws_server_req_raw_cb(void *closure, size_t *size);
168 static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size);
169 static void api_ws_server_req_addref_cb(void *closure);
170 static void api_ws_server_req_unref_cb(void *closure);
171 static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event);
172 static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event);
173 static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
174
175 const struct afb_req_itf afb_api_ws_req_itf = {
176         .json = api_ws_server_req_json_cb,
177         .get = api_ws_server_req_get_cb,
178         .success = api_ws_server_req_success_cb,
179         .fail = api_ws_server_req_fail_cb,
180         .raw = api_ws_server_req_raw_cb,
181         .send = api_ws_server_req_send_cb,
182         .context_get = (void*)afb_context_get,
183         .context_set = (void*)afb_context_set,
184         .addref = api_ws_server_req_addref_cb,
185         .unref = api_ws_server_req_unref_cb,
186         .session_close = (void*)afb_context_close,
187         .session_set_LOA = (void*)afb_context_change_loa,
188         .subscribe = api_ws_server_req_subscribe_cb,
189         .unsubscribe = api_ws_server_req_unsubscribe_cb,
190         .subcall = api_ws_server_req_subcall_cb
191 };
192
193 /******************* common part **********************************/
194
195 /*
196  * create a structure api_ws not connected to the 'path'.
197  */
198 static struct api_ws *api_ws_make(const char *path)
199 {
200         struct api_ws *api;
201         size_t length;
202
203         /* allocates the structure */
204         length = strlen(path);
205         api = calloc(1, sizeof *api + 1 + length);
206         if (api == NULL) {
207                 errno = ENOMEM;
208                 goto error;
209         }
210
211         /* path is copied after the struct */
212         api->path = (char*)(api+1);
213         memcpy(api->path, path, length + 1);
214
215         /* api name is at the end of the path */
216         while (length && path[length - 1] != '/' && path[length - 1] != ':')
217                 length = length - 1;
218         api->api = &api->path[length];
219         if (api->api == NULL || !afb_apis_is_valid_api_name(++api->api)) {
220                 errno = EINVAL;
221                 goto error2;
222         }
223
224         api->fd = -1;
225         return api;
226
227 error2:
228         free(api);
229 error:
230         return NULL;
231 }
232
233 static int api_ws_socket_unix(const char *path, int server)
234 {
235         int fd, rc;
236         struct sockaddr_un addr;
237         size_t length;
238
239         length = strlen(path);
240         if (length >= 108) {
241                 errno = ENAMETOOLONG;
242                 return -1;
243         }
244
245         if (server)
246                 unlink(path);
247
248         fd = socket(AF_UNIX, SOCK_STREAM, 0);
249         if (fd < 0)
250                 return fd;
251
252         memset(&addr, 0, sizeof addr);
253         addr.sun_family = AF_UNIX;
254         strcpy(addr.sun_path, path);
255         if (server) {
256                 rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
257         } else {
258                 rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
259         }
260         if (rc < 0) {
261                 close(fd);
262                 return rc;
263         }
264         return fd;
265 }
266
267 static int api_ws_socket_inet(const char *path, int server)
268 {
269         int rc, fd;
270         const char *service, *host, *api;
271         struct addrinfo hint, *rai, *iai;
272
273         /* scan the uri */
274         api = strrchr(path, '/');
275         service = strrchr(path, ':');
276         if (api == NULL || service == NULL || api < service) {
277                 errno = EINVAL;
278                 return -1;
279         }
280         host = strndupa(path, service++ - path);
281         service = strndupa(service, api - service);
282
283         /* get addr */
284         memset(&hint, 0, sizeof hint);
285         hint.ai_family = AF_INET;
286         hint.ai_socktype = SOCK_STREAM;
287         rc = getaddrinfo(host, service, &hint, &rai);
288         if (rc != 0) {
289                 errno = EINVAL;
290                 return -1;
291         }
292
293         /* get the socket */
294         iai = rai;
295         while (iai != NULL) {
296                 fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol);
297                 if (fd >= 0) {
298                         if (server) {
299                                 rc = bind(fd, iai->ai_addr, iai->ai_addrlen);
300                         } else {
301                                 rc = connect(fd, iai->ai_addr, iai->ai_addrlen);
302                         }
303                         if (rc == 0) {
304                                 freeaddrinfo(rai);
305                                 return fd;
306                         }
307                         close(fd);
308                 }
309                 iai = iai->ai_next;
310         }
311         freeaddrinfo(rai);
312         return -1;
313         
314 }
315
316 static int api_ws_socket(const char *path, int server)
317 {
318         int fd, rc;
319
320         /* check for systemd socket */
321         if (0 == strncmp(path, "sd:", 3))
322                 fd = sd_fds_for(path + 3);
323         else {
324                 /* check for unix socket */
325                 if (0 == strncmp(path, "unix:", 5))
326                         /* unix socket */
327                         fd = api_ws_socket_unix(path + 5, server);
328                 else
329                         /* inet socket */
330                         fd = api_ws_socket_inet(path, server);
331
332                 if (fd >= 0 && server) {
333                         rc = 1;
334                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc);
335                         rc = listen(fd, 5);
336                 }
337         }
338         /* configure the socket */
339         if (fd >= 0) {
340                 fcntl(fd, F_SETFD, FD_CLOEXEC);
341                 fcntl(fd, F_SETFL, O_NONBLOCK);
342         }
343         return fd;
344 }
345
346 /******************* serialisation part **********************************/
347
348 struct readbuf
349 {
350         char *head, *end;
351 };
352
353 #define WRITEBUF_COUNT_MAX  32
354 struct writebuf
355 {
356         struct iovec iovec[WRITEBUF_COUNT_MAX];
357         uint32_t uints[WRITEBUF_COUNT_MAX];
358         int count;
359 };
360
361 static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
362 {
363         char *before = rb->head;
364         char *after = before + length;
365         if (after > rb->end)
366                 return 0;
367         rb->head = after;
368         return before;
369 }
370
371 static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
372 {
373         char *after = rb->head + sizeof *value;
374         if (after > rb->end)
375                 return 0;
376         memcpy(value, rb->head, sizeof *value);
377         rb->head = after;
378         *value = le32toh(*value);
379         return 1;
380 }
381
382 static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
383 {
384         uint32_t len;
385         if (!api_ws_read_uint32(rb, &len) || !len)
386                 return 0;
387         if (length)
388                 *length = (size_t)(len - 1);
389         return (*value = api_ws_read_get(rb, len)) != NULL &&  rb->head[-1] == 0;
390 }
391
392 static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
393 {
394         size_t length;
395         const char *string;
396         return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0);
397 }
398
399 static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
400 {
401         int i = wb->count;
402         if (i == WRITEBUF_COUNT_MAX)
403                 return 0;
404         wb->iovec[i].iov_base = (void*)value;
405         wb->iovec[i].iov_len = length;
406         wb->count = i + 1;
407         return 1;
408 }
409
410 static int api_ws_write_char(struct writebuf *wb, char value)
411 {
412         int i = wb->count;
413         if (i == WRITEBUF_COUNT_MAX)
414                 return 0;
415         *(char*)&wb->uints[i] = value;
416         wb->iovec[i].iov_base = &wb->uints[i];
417         wb->iovec[i].iov_len = 1;
418         wb->count = i + 1;
419         return 1;
420 }
421
422 static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
423 {
424         int i = wb->count;
425         if (i == WRITEBUF_COUNT_MAX)
426                 return 0;
427         wb->uints[i] = htole32(value);
428         wb->iovec[i].iov_base = &wb->uints[i];
429         wb->iovec[i].iov_len = sizeof wb->uints[i];
430         wb->count = i + 1;
431         return 1;
432 }
433
434 static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
435 {
436         uint32_t len = (uint32_t)++length;
437         return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
438 }
439
440 static int api_ws_write_string(struct writebuf *wb, const char *value)
441 {
442         return api_ws_write_string_length(wb, value, strlen(value));
443 }
444
445 static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
446 {
447         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
448         return string != NULL && api_ws_write_string(wb, string);
449 }
450
451
452
453
454 /******************* client part **********************************/
455
456 /*
457  * structure for recording query data
458  */
459 struct api_ws_memo {
460         struct api_ws_memo *next;               /* the next memo */
461         struct api_ws *api;             /* the ws api */
462         struct afb_req req;             /* the request handle */
463         struct afb_context *context;    /* the context of the query */
464         uint32_t msgid;                 /* the message identifier */
465 };
466
467 struct api_ws_event
468 {
469         struct api_ws_event *next;
470         struct afb_event event;
471         int eventid;
472         int refcount;
473 };
474
475 /* search a memorized request */
476 static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
477 {
478         struct api_ws_memo *memo;
479
480         memo = api->client.memos;
481         while (memo != NULL && memo->msgid != msgid)
482                 memo = memo->next;
483
484         return memo;
485 }
486
487 /* search the event */
488 static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
489 {
490         struct api_ws_event *ev;
491
492         ev = api->client.events;
493         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
494                 ev = ev->next;
495
496         return ev;
497 }
498
499
500 /* allocates and init the memorizing data */
501 static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_req req, struct afb_context *context)
502 {
503         struct api_ws_memo *memo;
504
505         memo = malloc(sizeof *memo);
506         if (memo != NULL) {
507                 afb_req_addref(req);
508                 memo->req = req;
509                 memo->context = context;
510                 do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
511                 memo->api = api;
512                 memo->next = api->client.memos;
513                 api->client.memos = memo;
514         }
515         return memo;
516 }
517
518 /* free and release the memorizing data */
519 static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
520 {
521         struct api_ws_memo **prv;
522
523         prv = &memo->api->client.memos;
524         while (*prv != NULL) {
525                 if (*prv == memo) {
526                         *prv = memo->next;
527                         break;
528                 }
529                 prv = &(*prv)->next;
530         }
531
532         afb_req_unref(memo->req);
533         free(memo);
534 }
535
536 /* get event data from the message */
537 static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
538 {
539         return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
540 }
541
542 /* get event from the message */
543 static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
544 {
545         const char *name;
546         uint32_t eventid;
547
548         /* get event data from the message */
549         if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
550                 ERROR("Invalid message");
551                 return 0;
552         }
553
554         /* check conflicts */
555         *ev = api_ws_client_event_search(api, eventid, name);
556         if (*ev == NULL) {
557                 ERROR("event %s not found", name);
558                 return 0;
559         }
560
561         return 1;
562 }
563
564 /* get event from the message */
565 static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
566 {
567         uint32_t msgid;
568
569         /* get event data from the message */
570         if (!api_ws_read_uint32(rb, &msgid)) {
571                 ERROR("Invalid message");
572                 return 0;
573         }
574
575         /* get the memo */
576         *memo = api_ws_client_memo_search(api, msgid);
577         if (*memo == NULL) {
578                 ERROR("message not found");
579                 return 0;
580         }
581
582         return 1;
583 }
584
585 /* read a subscrition message */
586 static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo)
587 {
588         return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
589 }
590
591 /* adds an event */
592 static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
593 {
594         size_t offset;
595         const char *name;
596         uint32_t eventid;
597         struct api_ws_event *ev;
598
599         /* get event data from the message */
600         offset = api_ws_client_msg_event_read(rb, &eventid, &name);
601         if (offset == 0) {
602                 ERROR("Invalid message");
603                 return;
604         }
605
606         /* check conflicts */
607         ev = api_ws_client_event_search(api, eventid, name);
608         if (ev != NULL) {
609                 ev->refcount++;
610                 return;
611         }
612
613         /* no conflict, try to add it */
614         ev = malloc(sizeof *ev);
615         if (ev != NULL) {
616                 ev->event = afb_evt_create_event(name);
617                 if (ev->event.closure == NULL)
618                         free(ev);
619                 else {
620                         ev->refcount = 1;
621                         ev->eventid = eventid;
622                         ev->next = api->client.events;
623                         api->client.events = ev;
624                         return;
625                 }
626         }
627         ERROR("can't create event %s, out of memory", name);
628 }
629
630 /* removes an event */
631 static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
632 {
633         struct api_ws_event *ev, **prv;
634
635         /* retrieves the event */
636         if (!api_ws_client_msg_event_get(api, rb, &ev))
637                 return;
638
639         /* decrease the reference count */
640         if (--ev->refcount)
641                 return;
642
643         /* unlinks the event */
644         prv = &api->client.events;
645         while (*prv != ev)
646                 prv = &(*prv)->next;
647         *prv = ev->next;
648
649         /* destroys the event */
650         afb_event_drop(ev->event);
651         free(ev);
652 }
653
654 /* subscribes an event */
655 static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
656 {
657         struct api_ws_event *ev;
658         struct api_ws_memo *memo;
659
660         if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
661                 /* subscribe the request from the event */
662                 if (afb_req_subscribe(memo->req, ev->event) < 0)
663                         ERROR("can't subscribe: %m");
664         }
665 }
666
667 /* unsubscribes an event */
668 static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
669 {
670         struct api_ws_event *ev;
671         struct api_ws_memo *memo;
672
673         if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
674                 /* unsubscribe the request from the event */
675                 if (afb_req_unsubscribe(memo->req, ev->event) < 0)
676                         ERROR("can't unsubscribe: %m");
677         }
678 }
679
680 /* receives broadcasted events */
681 static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
682 {
683         struct json_object *object;
684         const char *event;
685
686         if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
687                 afb_evt_broadcast(event, object);
688         else
689                 ERROR("unreadable broadcasted event");
690 }
691
692 /* pushs an event */
693 static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
694 {
695         struct api_ws_event *ev;
696         struct json_object *object;
697
698         if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
699                 afb_event_push(ev->event, object);
700         else
701                 ERROR("unreadable push event");
702 }
703
704 static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
705 {
706         struct api_ws_memo *memo;
707         struct json_object *object;
708         const char *info;
709         uint32_t flags;
710
711         /* retrieve the message data */
712         if (!api_ws_client_msg_memo_get(api, rb, &memo))
713                 return;
714
715         if (api_ws_read_uint32(rb, &flags)
716          && api_ws_read_string(rb, &info, NULL)
717          && api_ws_read_object(rb, &object)) {
718                 memo->context->flags = (unsigned)flags;
719                 afb_req_success(memo->req, object, *info ? info : NULL);
720         } else {
721                 /* failing to have the answer */
722                 afb_req_fail(memo->req, "error", "ws error");
723         }
724         api_ws_client_memo_destroy(memo);
725 }
726
727 static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
728 {
729         struct api_ws_memo *memo;
730         const char *info, *status;
731         uint32_t flags;
732
733         /* retrieve the message data */
734         if (!api_ws_client_msg_memo_get(api, rb, &memo))
735                 return;
736
737         if (api_ws_read_uint32(rb, &flags)
738          && api_ws_read_string(rb, &status, NULL)
739          && api_ws_read_string(rb, &info, NULL)) {
740                 memo->context->flags = (unsigned)flags;
741                 afb_req_fail(memo->req, status, *info ? info : NULL);
742         } else {
743                 /* failing to have the answer */
744                 afb_req_fail(memo->req, "error", "ws error");
745         }
746         api_ws_client_memo_destroy(memo);
747 }
748
749 static void api_ws_client_reply_send(struct api_ws *api, struct readbuf *rb)
750 {
751         struct api_ws_memo *memo;
752         const char *data;
753         size_t length;
754         uint32_t flags;
755
756         /* retrieve the message data */
757         if (!api_ws_client_msg_memo_get(api, rb, &memo))
758                 return;
759
760         if (api_ws_read_uint32(rb, &flags)
761          && api_ws_read_string(rb, &data, &length)) {
762                 memo->context->flags = (unsigned)flags;
763                 afb_req_send(memo->req, data, length);
764         } else {
765                 /* failing to have the answer */
766                 afb_req_fail(memo->req, "error", "ws error");
767         }
768         api_ws_client_memo_destroy(memo);
769 }
770
771 /* callback when receiving binary data */
772 static void api_ws_client_on_binary(void *closure, char *data, size_t size)
773 {
774         if (size > 0) {
775                 struct readbuf rb = { .head = data, .end = data + size };
776                 switch (*rb.head++) {
777                 case 'T': /* success */
778                         api_ws_client_reply_success(closure, &rb);
779                         break;
780                 case 'F': /* fail */
781                         api_ws_client_reply_fail(closure, &rb);
782                         break;
783                 case 'X': /* send */
784                         api_ws_client_reply_send(closure, &rb);
785                         break;
786                 case '*': /* broadcast */
787                         api_ws_client_event_broadcast(closure, &rb);
788                         break;
789                 case '+': /* creates the event */
790                         api_ws_client_event_create(closure, &rb);
791                         break;
792                 case '-': /* drops the event */
793                         api_ws_client_event_drop(closure, &rb);
794                         break;
795                 case '!': /* pushs the event */
796                         api_ws_client_event_push(closure, &rb);
797                         break;
798                 case 'S': /* subscribe event for a request */
799                         api_ws_client_event_subscribe(closure, &rb);
800                         break;
801                 case 'U': /* unsubscribe event for a request */
802                         api_ws_client_event_unsubscribe(closure, &rb);
803                         break;
804                 default: /* unexpected message */
805                         break;
806                 }
807         }
808         free(data);
809 }
810
811 /* on call, propagate it to the ws service */
812 static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb_context *context, const char *verb)
813 {
814         int rc;
815         struct api_ws_memo *memo;
816         struct writebuf wb = { .count = 0 };
817         const char *raw;
818         size_t szraw;
819         struct api_ws *api = closure;
820
821         /* create the recording data */
822         memo = api_ws_client_memo_make(api, req, context);
823         if (memo == NULL) {
824                 afb_req_fail(req, "error", "out of memory");
825                 return;
826         }
827
828         /* creates the call message */
829         raw = afb_req_raw(req, &szraw);
830         if (raw == NULL)
831                 goto internal_error;
832         if (!api_ws_write_uint32(&wb, memo->msgid)
833          || !api_ws_write_uint32(&wb, (uint32_t)context->flags)
834          || !api_ws_write_string(&wb, verb)
835          || !api_ws_write_string(&wb, afb_session_uuid(context->session))
836          || !api_ws_write_string_length(&wb, raw, szraw))
837                 goto overflow;
838
839         /* send */
840         rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count);
841         if (rc < 0)
842                 goto ws_send_error;
843         return;
844
845 ws_send_error:
846         afb_req_fail(req, "error", "websocket sending error");
847         goto clean_memo;
848
849 internal_error:
850         afb_req_fail(req, "error", "internal: raw is NULL!");
851         goto clean_memo;
852
853 overflow:
854         afb_req_fail(req, "error", "overflow: size doesn't match 32 bits!");
855
856 clean_memo:
857         api_ws_client_memo_destroy(memo);
858 }
859
860 static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
861 {
862         struct api_ws *api = closure;
863
864         /* not an error when onneed */
865         if (onneed != 0)
866                 return 0;
867
868         /* already started: it is an error */
869         ERROR("The WS binding %s is not a startable service", api->path);
870         return -1;
871 }
872
873 /*  */
874 static void api_ws_client_disconnect(struct api_ws *api)
875 {
876         if (api->fd >= 0) {
877                 afb_ws_destroy(api->client.ws);
878                 api->client.ws = NULL;
879                 close(api->fd);
880                 api->fd = -1;
881         }
882 }
883
884 /*  */
885 static int api_ws_client_connect(struct api_ws *api)
886 {
887         struct afb_ws *ws;
888         int fd;
889
890         fd = api_ws_socket(api->path, 0);
891         if (fd >= 0) {
892                 ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
893                 if (ws != NULL) {
894                         api->client.ws = ws;
895                         api->fd = fd;
896                         return 0;
897                 }
898                 close(fd);
899         }
900         return -1;
901 }
902
903 /* adds a afb-ws-service client api */
904 int afb_api_ws_add_client(const char *path)
905 {
906         int rc;
907         struct api_ws *api;
908         struct afb_api afb_api;
909
910         /* create the ws client api */
911         api = api_ws_make(path);
912         if (api == NULL)
913                 goto error;
914
915         /* connect to the service */
916         rc = api_ws_client_connect(api);
917         if (rc < 0) {
918                 ERROR("can't connect to ws service %s", api->path);
919                 goto error2;
920         }
921
922         /* record it as an API */
923         afb_api.closure = api;
924         afb_api.call = api_ws_client_call_cb;
925         afb_api.service_start = api_ws_service_start_cb;
926         if (afb_apis_add(api->api, afb_api) < 0)
927                 goto error3;
928
929         return 0;
930
931 error3:
932         api_ws_client_disconnect(api);
933 error2:
934         free(api);
935 error:
936         return -1;
937 }
938
939 /******************* client description part for server *****************************/
940
941 static void api_ws_server_client_unref(struct api_ws_client *client)
942 {
943         if (!--client->refcount) {
944                 afb_evt_listener_unref(client->listener);
945                 afb_ws_destroy(client->ws);
946                 free(client);
947         }
948 }
949
950 /* on call, propagate it to the ws service */
951 static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size)
952 {
953         struct api_ws_server_req *wreq;
954         struct afb_req areq;
955         const char *uuid, *verb;
956         uint32_t flags;
957
958         client->refcount++;
959
960         /* create the request */
961         wreq = calloc(1 , sizeof *wreq);
962         if (wreq == NULL)
963                 goto out_of_memory;
964
965         wreq->client = client;
966         wreq->rcvdata = data;
967         wreq->refcount = 1;
968
969         /* reads the call message data */
970         if (!api_ws_read_uint32(rb, &wreq->msgid)
971          || !api_ws_read_uint32(rb, &flags)
972          || !api_ws_read_string(rb, &verb, NULL)
973          || !api_ws_read_string(rb, &uuid, NULL)
974          || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
975                 goto overflow;
976
977         /* init the context */
978         if (afb_context_connect(&wreq->context, uuid, NULL) < 0)
979                 goto out_of_memory;
980         wreq->context.flags = flags;
981
982         /* makes the call */
983         areq.itf = &afb_api_ws_req_itf;
984         areq.closure = wreq;
985         afb_apis_call(areq, &wreq->context, client->api, verb);
986         api_ws_server_req_unref(wreq);
987         return;
988
989 out_of_memory:
990 overflow:
991         free(wreq);
992         free(data);
993         api_ws_server_client_unref(client);
994 }
995
996 /* callback when receiving binary data */
997 static void api_ws_server_on_binary(void *closure, char *data, size_t size)
998 {
999         struct readbuf rb = { .head = data, .end = data + size };
1000         api_ws_server_called(closure, &rb, data, size);
1001 }
1002
1003 /* callback when receiving a hangup */
1004 static void api_ws_server_on_hangup(void *closure)
1005 {
1006         struct api_ws_client *client = closure;
1007
1008         /* close the socket */
1009         if (client->fd >= 0) {
1010                 close(client->fd);
1011                 client->fd = -1;
1012         }
1013
1014         /* release the client */
1015         api_ws_server_client_unref(client);
1016 }
1017
1018 static void api_ws_server_accept(struct api_ws *api)
1019 {
1020         struct api_ws_client *client;
1021         struct sockaddr addr;
1022         socklen_t lenaddr;
1023
1024         client = calloc(1, sizeof *client);
1025         if (client != NULL) {
1026                 client->listener = afb_evt_listener_create(&api_ws_server_evt_itf, client);
1027                 if (client->listener != NULL) {
1028                         lenaddr = (socklen_t)sizeof addr;
1029                         client->fd = accept(api->fd, &addr, &lenaddr);
1030                         if (client->fd >= 0) {
1031                                 fcntl(client->fd, F_SETFD, FD_CLOEXEC);
1032                                 fcntl(client->fd, F_SETFL, O_NONBLOCK);
1033                                 client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
1034                                 if (client->ws != NULL) {
1035                                         client->api = api->api;
1036                                         client->refcount = 1;
1037                                         return;
1038                                 }
1039                                 close(client->fd);
1040                         }
1041                         afb_evt_listener_unref(client->listener);
1042                 }
1043                 free(client);
1044         }
1045 }
1046
1047 /******************* server part: manage events **********************************/
1048
1049 static void api_ws_server_event_send(struct api_ws_client *client, char order, const char *event, int eventid, const char *data)
1050 {
1051         int rc;
1052         struct writebuf wb = { .count = 0 };
1053
1054         if (api_ws_write_char(&wb, order)
1055          && api_ws_write_uint32(&wb, eventid)
1056          && api_ws_write_string(&wb, event)
1057          && (data == NULL || api_ws_write_string(&wb, data))) {
1058                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1059                 if (rc >= 0)
1060                         return;
1061         }
1062         ERROR("error while sending %c for event %s", order, event);
1063 }
1064
1065 static void api_ws_server_event_add(void *closure, const char *event, int eventid)
1066 {
1067         api_ws_server_event_send(closure, '+', event, eventid, NULL);
1068 }
1069
1070 static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
1071 {
1072         api_ws_server_event_send(closure, '-', event, eventid, NULL);
1073 }
1074
1075 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1076 {
1077         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1078         api_ws_server_event_send(closure, '!', event, eventid, data ? : "null");
1079         json_object_put(object);
1080 }
1081
1082 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1083 {
1084         int rc;
1085         struct api_ws_client *client = closure;
1086
1087         struct writebuf wb = { .count = 0 };
1088
1089         if (api_ws_write_char(&wb, '*') && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
1090                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1091                 if (rc < 0)
1092                         ERROR("error while broadcasting event %s", event);
1093         } else
1094                 ERROR("error while broadcasting event %s", event);
1095         json_object_put(object);
1096 }
1097
1098 /******************* ws request part for server *****************/
1099
1100 /* increment the reference count of the request */
1101 static void api_ws_server_req_addref_cb(void *closure)
1102 {
1103         struct api_ws_server_req *wreq = closure;
1104         wreq->refcount++;
1105 }
1106
1107 /* decrement the reference count of the request and free/release it on falling to null */
1108 static void api_ws_server_req_unref_cb(void *closure)
1109 {
1110         api_ws_server_req_unref(closure);
1111 }
1112
1113 static void api_ws_server_req_unref(struct api_ws_server_req *wreq)
1114 {
1115         if (wreq == NULL || --wreq->refcount)
1116                 return;
1117
1118         afb_context_disconnect(&wreq->context);
1119         json_object_put(wreq->json);
1120         free(wreq->rcvdata);
1121         api_ws_server_client_unref(wreq->client);
1122         free(wreq);
1123 }
1124
1125 /* get the object of the request */
1126 static struct json_object *api_ws_server_req_json_cb(void *closure)
1127 {
1128         return api_ws_server_req_json(closure);
1129 }
1130
1131 static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq)
1132 {
1133         if (wreq->json == NULL) {
1134                 wreq->json = json_tokener_parse(wreq->request);
1135                 if (wreq->json == NULL && strcmp(wreq->request, "null")) {
1136                         /* lazy error detection of json request. Is it to improve? */
1137                         wreq->json = json_object_new_string(wreq->request);
1138                 }
1139         }
1140         return wreq->json;
1141 }
1142
1143 /* get the argument of the request of 'name' */
1144 static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name)
1145 {
1146         struct api_ws_server_req *wreq = closure;
1147         return afb_msg_json_get_arg(api_ws_server_req_json(wreq), name);
1148 }
1149
1150 static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info)
1151 {
1152         int rc;
1153         struct writebuf wb = { .count = 0 };
1154         struct api_ws_server_req *wreq = closure;
1155
1156         if (api_ws_write_char(&wb, 'T')
1157          && api_ws_write_uint32(&wb, wreq->msgid)
1158          && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
1159          && api_ws_write_string(&wb, info ? : "")
1160          && api_ws_write_object(&wb, obj)) {
1161                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1162                 if (rc >= 0)
1163                         goto success;
1164         }
1165         ERROR("error while sending success");
1166 success:
1167         json_object_put(obj);
1168 }
1169
1170 static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info)
1171 {
1172         int rc;
1173         struct writebuf wb = { .count = 0 };
1174         struct api_ws_server_req *wreq = closure;
1175
1176         if (api_ws_write_char(&wb, 'F')
1177          && api_ws_write_uint32(&wb, wreq->msgid)
1178          && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
1179          && api_ws_write_string(&wb, status)
1180          && api_ws_write_string(&wb, info ? : "")) {
1181                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1182                 if (rc >= 0)
1183                         return;
1184         }
1185         ERROR("error while sending fail");
1186 }
1187
1188 static const char *api_ws_server_req_raw_cb(void *closure, size_t *size)
1189 {
1190         struct api_ws_server_req *wreq = closure;
1191         if (size != NULL)
1192                 *size = wreq->lenreq;
1193         return wreq->request;
1194 }
1195
1196 static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size)
1197 {
1198         /* TODO: how to put sized buffer as strings? things aren't clear here!!! */
1199         int rc;
1200         struct writebuf wb = { .count = 0 };
1201         struct api_ws_server_req *wreq = closure;
1202
1203         if (api_ws_write_char(&wb, 'X')
1204          && api_ws_write_uint32(&wb, wreq->msgid)
1205          && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
1206          && api_ws_write_string_length(&wb, buffer, size)) {
1207                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1208                 if (rc >= 0)
1209                         return;
1210         }
1211         ERROR("error while sending raw");
1212 }
1213
1214 static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event)
1215 {
1216         int rc, rc2;
1217         struct writebuf wb = { .count = 0 };
1218         struct api_ws_server_req *wreq = closure;
1219
1220         rc = afb_evt_add_watch(wreq->client->listener, event);
1221         if (rc < 0)
1222                 return rc;
1223
1224         if (api_ws_write_char(&wb, 'S')
1225          && api_ws_write_uint32(&wb, wreq->msgid)
1226          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1227          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1228                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1229                 if (rc2 >= 0)
1230                         goto success;
1231         }
1232         ERROR("error while subscribing event");
1233 success:
1234         return rc;
1235 }
1236
1237 static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event)
1238 {
1239         int rc, rc2;
1240         struct writebuf wb = { .count = 0 };
1241         struct api_ws_server_req *wreq = closure;
1242
1243         if (api_ws_write_char(&wb, 'U')
1244          && api_ws_write_uint32(&wb, wreq->msgid)
1245          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1246          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1247                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1248                 if (rc2 >= 0)
1249                         goto success;
1250         }
1251         ERROR("error while subscribing event");
1252 success:
1253         rc = afb_evt_remove_watch(wreq->client->listener, event);
1254         return rc;
1255 }
1256
1257 static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
1258 {
1259         struct api_ws_server_req *wreq = closure;
1260         afb_subcall(&wreq->context, api, verb, args, callback, cb_closure, (struct afb_req){ .itf = &afb_api_ws_req_itf, .closure = wreq });
1261 }
1262
1263 /******************* server part **********************************/
1264
1265 static int api_ws_server_connect(struct api_ws *api);
1266
1267 static int api_ws_server_listen_callback(sd_event_source *src, int fd, uint32_t revents, void *closure)
1268 {
1269         if ((revents & EPOLLIN) != 0)
1270                 api_ws_server_accept(closure);
1271         if ((revents & EPOLLHUP) != 0)
1272                 api_ws_server_connect(closure);
1273         return 0;
1274 }
1275
1276 static void api_ws_server_disconnect(struct api_ws *api)
1277 {
1278         if (api->server.listensrc != NULL) {
1279                 sd_event_source_unref(api->server.listensrc);
1280                 api->server.listensrc = NULL;
1281         }
1282         if (api->fd >= 0) {
1283                 close(api->fd);
1284                 api->fd = -1;
1285         }
1286 }
1287
1288 static int api_ws_server_connect(struct api_ws *api)
1289 {
1290         int rc;
1291
1292         /* ensure disconnected */
1293         api_ws_server_disconnect(api);
1294
1295         /* request the service object name */
1296         api->fd = api_ws_socket(api->path, 1);
1297         if (api->fd < 0)
1298                 ERROR("can't create socket %s", api->path);
1299         else {
1300                 /* listen for service */
1301                 rc = sd_event_add_io(afb_common_get_event_loop(), &api->server.listensrc, api->fd, EPOLLIN, api_ws_server_listen_callback, api);
1302                 if (rc >= 0)
1303                         return 0;
1304                 close(api->fd);
1305                 errno = -rc;
1306                 ERROR("can't add ws object %s", api->path);
1307         }
1308         return -1;
1309 }
1310
1311 /* create the service */
1312 int afb_api_ws_add_server(const char *path)
1313 {
1314         int rc;
1315         struct api_ws *api;
1316
1317         /* creates the ws api object */
1318         api = api_ws_make(path);
1319         if (api == NULL)
1320                 goto error;
1321
1322         /* connect for serving */
1323         rc = api_ws_server_connect(api);
1324         if (rc < 0)
1325                 goto error2;
1326
1327         return 0;
1328
1329 error2:
1330         free(api);
1331 error:
1332         return -1;
1333 }
1334
1335