Switch API websocket to xreq
[src/app-framework-binder.git] / src / afb-api-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_PLUGIN_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <assert.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <endian.h>
28 #include <netdb.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32
33 #include <json-c/json.h>
34 #include <systemd/sd-event.h>
35
36 #include <afb/afb-req-itf.h>
37
38 #include "afb-common.h"
39
40 #include "afb-session.h"
41 #include "afb-ws.h"
42 #include "afb-msg-json.h"
43 #include "afb-apis.h"
44 #include "afb-api-so.h"
45 #include "afb-context.h"
46 #include "afb-evt.h"
47 #include "afb-xreq.h"
48 #include "verbose.h"
49 #include "sd-fds.h"
50
51 struct api_ws_memo;
52 struct api_ws_event;
53 struct api_ws_client;
54
55
56
57 /*
58  */
59 struct api_ws
60 {
61         char *path;             /* path of the object for the API */
62         char *api;              /* api name of the interface */
63         int fd;                 /* file descriptor */
64         union {
65                 struct {
66                         uint32_t id;
67                         struct afb_ws *ws;
68                         struct api_ws_event *events;
69                         struct api_ws_memo *memos;
70                 } client;
71                 struct {
72                         sd_event_source *listensrc;
73                         struct afb_evt_listener *listener; /* listener for broadcasted events */
74                 } server;
75         };
76 };
77
78 #define RETOK   1
79 #define RETERR  2
80 #define RETRAW  3
81
82 /******************* websocket interface for client part **********************************/
83
84 static void api_ws_client_on_binary(void *closure, char *data, size_t size);
85
86 static const struct afb_ws_itf api_ws_client_ws_itf =
87 {
88         .on_close = NULL,
89         .on_text = NULL,
90         .on_binary = api_ws_client_on_binary,
91         .on_error = NULL,
92         .on_hangup = NULL
93 };
94
95 /******************* event structures for server part **********************************/
96
97 static void api_ws_server_event_add(void *closure, const char *event, int eventid);
98 static void api_ws_server_event_remove(void *closure, const char *event, int eventid);
99 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
100 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
101
102 /* the interface for events pushing */
103 static const struct afb_evt_itf api_ws_server_evt_itf = {
104         .broadcast = api_ws_server_event_broadcast,
105         .push = api_ws_server_event_push,
106         .add = api_ws_server_event_add,
107         .remove = api_ws_server_event_remove
108 };
109
110 /******************* client description part for server *****************************/
111
112 struct api_ws_client
113 {
114         /* the server ws-api */
115         const char *api;
116
117         /* count of references */
118         int refcount;
119
120         /* listener for events */
121         struct afb_evt_listener *listener;
122
123         /* file descriptor */
124         int fd;
125
126         /* websocket */
127         struct afb_ws *ws;
128 };
129
130 /******************* websocket interface for client part **********************************/
131
132 static void api_ws_server_on_binary(void *closure, char *data, size_t size);
133 static void api_ws_server_on_hangup(void *closure);
134
135 static const struct afb_ws_itf api_ws_server_ws_itf =
136 {
137         .on_close = NULL,
138         .on_text = NULL,
139         .on_binary = api_ws_server_on_binary,
140         .on_error = NULL,
141         .on_hangup = api_ws_server_on_hangup
142 };
143
144 /******************* ws request part for server *****************/
145
146 /*
147  * structure for a ws request
148  */
149 struct api_ws_server_req {
150         struct afb_xreq xreq;           /* the xreq */
151         struct api_ws_client *client;   /* the client of the request */
152         char *rcvdata;                  /* the received data to free */
153         const char *request;            /* the readen request as string */
154         size_t lenreq;                  /* the length of the request */
155         uint32_t msgid;                 /* the incoming request msgid */
156 };
157
158 static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info);
159 static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info);
160 static void api_ws_server_req_destroy_cb(void *closure);
161 static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event);
162 static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event);
163
164 const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
165         .success = api_ws_server_req_success_cb,
166         .fail = api_ws_server_req_fail_cb,
167         .unref = api_ws_server_req_destroy_cb,
168         .subscribe = api_ws_server_req_subscribe_cb,
169         .unsubscribe = api_ws_server_req_unsubscribe_cb
170 };
171
172 /******************* common part **********************************/
173
174 /*
175  * create a structure api_ws not connected to the 'path'.
176  */
177 static struct api_ws *api_ws_make(const char *path)
178 {
179         struct api_ws *api;
180         size_t length;
181
182         /* allocates the structure */
183         length = strlen(path);
184         api = calloc(1, sizeof *api + 1 + length);
185         if (api == NULL) {
186                 errno = ENOMEM;
187                 goto error;
188         }
189
190         /* path is copied after the struct */
191         api->path = (char*)(api+1);
192         memcpy(api->path, path, length + 1);
193
194         /* api name is at the end of the path */
195         while (length && path[length - 1] != '/' && path[length - 1] != ':')
196                 length = length - 1;
197         api->api = &api->path[length];
198         if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) {
199                 errno = EINVAL;
200                 goto error2;
201         }
202
203         api->fd = -1;
204         return api;
205
206 error2:
207         free(api);
208 error:
209         return NULL;
210 }
211
212 static int api_ws_socket_unix(const char *path, int server)
213 {
214         int fd, rc;
215         struct sockaddr_un addr;
216         size_t length;
217
218         length = strlen(path);
219         if (length >= 108) {
220                 errno = ENAMETOOLONG;
221                 return -1;
222         }
223
224         if (server)
225                 unlink(path);
226
227         fd = socket(AF_UNIX, SOCK_STREAM, 0);
228         if (fd < 0)
229                 return fd;
230
231         memset(&addr, 0, sizeof addr);
232         addr.sun_family = AF_UNIX;
233         strcpy(addr.sun_path, path);
234         if (server) {
235                 rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
236         } else {
237                 rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
238         }
239         if (rc < 0) {
240                 close(fd);
241                 return rc;
242         }
243         return fd;
244 }
245
246 static int api_ws_socket_inet(const char *path, int server)
247 {
248         int rc, fd;
249         const char *service, *host, *api;
250         struct addrinfo hint, *rai, *iai;
251
252         /* scan the uri */
253         api = strrchr(path, '/');
254         service = strrchr(path, ':');
255         if (api == NULL || service == NULL || api < service) {
256                 errno = EINVAL;
257                 return -1;
258         }
259         host = strndupa(path, service++ - path);
260         service = strndupa(service, api - service);
261
262         /* get addr */
263         memset(&hint, 0, sizeof hint);
264         hint.ai_family = AF_INET;
265         hint.ai_socktype = SOCK_STREAM;
266         rc = getaddrinfo(host, service, &hint, &rai);
267         if (rc != 0) {
268                 errno = EINVAL;
269                 return -1;
270         }
271
272         /* get the socket */
273         iai = rai;
274         while (iai != NULL) {
275                 fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol);
276                 if (fd >= 0) {
277                         if (server) {
278                                 rc = bind(fd, iai->ai_addr, iai->ai_addrlen);
279                         } else {
280                                 rc = connect(fd, iai->ai_addr, iai->ai_addrlen);
281                         }
282                         if (rc == 0) {
283                                 freeaddrinfo(rai);
284                                 return fd;
285                         }
286                         close(fd);
287                 }
288                 iai = iai->ai_next;
289         }
290         freeaddrinfo(rai);
291         return -1;
292         
293 }
294
295 static int api_ws_socket(const char *path, int server)
296 {
297         int fd, rc;
298
299         /* check for systemd socket */
300         if (0 == strncmp(path, "sd:", 3))
301                 fd = sd_fds_for(path + 3);
302         else {
303                 /* check for unix socket */
304                 if (0 == strncmp(path, "unix:", 5))
305                         /* unix socket */
306                         fd = api_ws_socket_unix(path + 5, server);
307                 else
308                         /* inet socket */
309                         fd = api_ws_socket_inet(path, server);
310
311                 if (fd >= 0 && server) {
312                         rc = 1;
313                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc);
314                         rc = listen(fd, 5);
315                 }
316         }
317         /* configure the socket */
318         if (fd >= 0) {
319                 fcntl(fd, F_SETFD, FD_CLOEXEC);
320                 fcntl(fd, F_SETFL, O_NONBLOCK);
321         }
322         return fd;
323 }
324
325 /******************* serialisation part **********************************/
326
327 struct readbuf
328 {
329         char *head, *end;
330 };
331
332 #define WRITEBUF_COUNT_MAX  32
333 struct writebuf
334 {
335         struct iovec iovec[WRITEBUF_COUNT_MAX];
336         uint32_t uints[WRITEBUF_COUNT_MAX];
337         int count;
338 };
339
340 static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
341 {
342         char *before = rb->head;
343         char *after = before + length;
344         if (after > rb->end)
345                 return 0;
346         rb->head = after;
347         return before;
348 }
349
350 static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
351 {
352         char *after = rb->head + sizeof *value;
353         if (after > rb->end)
354                 return 0;
355         memcpy(value, rb->head, sizeof *value);
356         rb->head = after;
357         *value = le32toh(*value);
358         return 1;
359 }
360
361 static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
362 {
363         uint32_t len;
364         if (!api_ws_read_uint32(rb, &len) || !len)
365                 return 0;
366         if (length)
367                 *length = (size_t)(len - 1);
368         return (*value = api_ws_read_get(rb, len)) != NULL &&  rb->head[-1] == 0;
369 }
370
371 static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
372 {
373         size_t length;
374         const char *string;
375         return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0);
376 }
377
378 static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
379 {
380         int i = wb->count;
381         if (i == WRITEBUF_COUNT_MAX)
382                 return 0;
383         wb->iovec[i].iov_base = (void*)value;
384         wb->iovec[i].iov_len = length;
385         wb->count = i + 1;
386         return 1;
387 }
388
389 static int api_ws_write_char(struct writebuf *wb, char value)
390 {
391         int i = wb->count;
392         if (i == WRITEBUF_COUNT_MAX)
393                 return 0;
394         *(char*)&wb->uints[i] = value;
395         wb->iovec[i].iov_base = &wb->uints[i];
396         wb->iovec[i].iov_len = 1;
397         wb->count = i + 1;
398         return 1;
399 }
400
401 static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
402 {
403         int i = wb->count;
404         if (i == WRITEBUF_COUNT_MAX)
405                 return 0;
406         wb->uints[i] = htole32(value);
407         wb->iovec[i].iov_base = &wb->uints[i];
408         wb->iovec[i].iov_len = sizeof wb->uints[i];
409         wb->count = i + 1;
410         return 1;
411 }
412
413 static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
414 {
415         uint32_t len = (uint32_t)++length;
416         return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
417 }
418
419 static int api_ws_write_string(struct writebuf *wb, const char *value)
420 {
421         return api_ws_write_string_length(wb, value, strlen(value));
422 }
423
424 static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
425 {
426         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
427         return string != NULL && api_ws_write_string(wb, string);
428 }
429
430
431
432
433 /******************* client part **********************************/
434
435 /*
436  * structure for recording query data
437  */
438 struct api_ws_memo {
439         struct api_ws_memo *next;               /* the next memo */
440         struct api_ws *api;             /* the ws api */
441         struct afb_xreq *xreq;          /* the request handle */
442 #if 0
443         struct afb_req req;             /* the request handle */
444         struct afb_context *context;    /* the context of the query */
445 #endif
446         uint32_t msgid;                 /* the message identifier */
447 };
448
449 struct api_ws_event
450 {
451         struct api_ws_event *next;
452         struct afb_event event;
453         int eventid;
454         int refcount;
455 };
456
457 /* search a memorized request */
458 static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
459 {
460         struct api_ws_memo *memo;
461
462         memo = api->client.memos;
463         while (memo != NULL && memo->msgid != msgid)
464                 memo = memo->next;
465
466         return memo;
467 }
468
469 /* search the event */
470 static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
471 {
472         struct api_ws_event *ev;
473
474         ev = api->client.events;
475         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
476                 ev = ev->next;
477
478         return ev;
479 }
480
481
482 /* allocates and init the memorizing data */
483 static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
484 {
485         struct api_ws_memo *memo;
486
487         memo = malloc(sizeof *memo);
488         if (memo != NULL) {
489                 afb_xreq_addref(xreq);
490                 memo->xreq = xreq;
491                 do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
492                 memo->api = api;
493                 memo->next = api->client.memos;
494                 api->client.memos = memo;
495         }
496         return memo;
497 }
498
499 /* free and release the memorizing data */
500 static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
501 {
502         struct api_ws_memo **prv;
503
504         prv = &memo->api->client.memos;
505         while (*prv != NULL) {
506                 if (*prv == memo) {
507                         *prv = memo->next;
508                         break;
509                 }
510                 prv = &(*prv)->next;
511         }
512
513         afb_xreq_unref(memo->xreq);
514         free(memo);
515 }
516
517 /* get event data from the message */
518 static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
519 {
520         return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
521 }
522
523 /* get event from the message */
524 static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
525 {
526         const char *name;
527         uint32_t eventid;
528
529         /* get event data from the message */
530         if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
531                 ERROR("Invalid message");
532                 return 0;
533         }
534
535         /* check conflicts */
536         *ev = api_ws_client_event_search(api, eventid, name);
537         if (*ev == NULL) {
538                 ERROR("event %s not found", name);
539                 return 0;
540         }
541
542         return 1;
543 }
544
545 /* get event from the message */
546 static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
547 {
548         uint32_t msgid;
549
550         /* get event data from the message */
551         if (!api_ws_read_uint32(rb, &msgid)) {
552                 ERROR("Invalid message");
553                 return 0;
554         }
555
556         /* get the memo */
557         *memo = api_ws_client_memo_search(api, msgid);
558         if (*memo == NULL) {
559                 ERROR("message not found");
560                 return 0;
561         }
562
563         return 1;
564 }
565
566 /* read a subscrition message */
567 static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo)
568 {
569         return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
570 }
571
572 /* adds an event */
573 static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
574 {
575         size_t offset;
576         const char *name;
577         uint32_t eventid;
578         struct api_ws_event *ev;
579
580         /* get event data from the message */
581         offset = api_ws_client_msg_event_read(rb, &eventid, &name);
582         if (offset == 0) {
583                 ERROR("Invalid message");
584                 return;
585         }
586
587         /* check conflicts */
588         ev = api_ws_client_event_search(api, eventid, name);
589         if (ev != NULL) {
590                 ev->refcount++;
591                 return;
592         }
593
594         /* no conflict, try to add it */
595         ev = malloc(sizeof *ev);
596         if (ev != NULL) {
597                 ev->event = afb_evt_create_event(name);
598                 if (ev->event.closure == NULL)
599                         free(ev);
600                 else {
601                         ev->refcount = 1;
602                         ev->eventid = eventid;
603                         ev->next = api->client.events;
604                         api->client.events = ev;
605                         return;
606                 }
607         }
608         ERROR("can't create event %s, out of memory", name);
609 }
610
611 /* removes an event */
612 static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
613 {
614         struct api_ws_event *ev, **prv;
615
616         /* retrieves the event */
617         if (!api_ws_client_msg_event_get(api, rb, &ev))
618                 return;
619
620         /* decrease the reference count */
621         if (--ev->refcount)
622                 return;
623
624         /* unlinks the event */
625         prv = &api->client.events;
626         while (*prv != ev)
627                 prv = &(*prv)->next;
628         *prv = ev->next;
629
630         /* destroys the event */
631         afb_event_drop(ev->event);
632         free(ev);
633 }
634
635 /* subscribes an event */
636 static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
637 {
638         struct api_ws_event *ev;
639         struct api_ws_memo *memo;
640
641         if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
642                 /* subscribe the request from the event */
643                 if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
644                         ERROR("can't subscribe: %m");
645         }
646 }
647
648 /* unsubscribes an event */
649 static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
650 {
651         struct api_ws_event *ev;
652         struct api_ws_memo *memo;
653
654         if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
655                 /* unsubscribe the request from the event */
656                 if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
657                         ERROR("can't unsubscribe: %m");
658         }
659 }
660
661 /* receives broadcasted events */
662 static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
663 {
664         struct json_object *object;
665         const char *event;
666
667         if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
668                 afb_evt_broadcast(event, object);
669         else
670                 ERROR("unreadable broadcasted event");
671 }
672
673 /* pushs an event */
674 static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
675 {
676         struct api_ws_event *ev;
677         struct json_object *object;
678
679         if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
680                 afb_event_push(ev->event, object);
681         else
682                 ERROR("unreadable push event");
683 }
684
685 static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
686 {
687         struct api_ws_memo *memo;
688         struct json_object *object;
689         const char *info;
690         uint32_t flags;
691
692         /* retrieve the message data */
693         if (!api_ws_client_msg_memo_get(api, rb, &memo))
694                 return;
695
696         if (api_ws_read_uint32(rb, &flags)
697          && api_ws_read_string(rb, &info, NULL)
698          && api_ws_read_object(rb, &object)) {
699                 memo->xreq->context.flags = (unsigned)flags;
700                 afb_xreq_success(memo->xreq, object, *info ? info : NULL);
701         } else {
702                 /* failing to have the answer */
703                 afb_xreq_fail(memo->xreq, "error", "ws error");
704         }
705         api_ws_client_memo_destroy(memo);
706 }
707
708 static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
709 {
710         struct api_ws_memo *memo;
711         const char *info, *status;
712         uint32_t flags;
713
714         /* retrieve the message data */
715         if (!api_ws_client_msg_memo_get(api, rb, &memo))
716                 return;
717
718         if (api_ws_read_uint32(rb, &flags)
719          && api_ws_read_string(rb, &status, NULL)
720          && api_ws_read_string(rb, &info, NULL)) {
721                 memo->xreq->context.flags = (unsigned)flags;
722                 afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
723         } else {
724                 /* failing to have the answer */
725                 afb_xreq_fail(memo->xreq, "error", "ws error");
726         }
727         api_ws_client_memo_destroy(memo);
728 }
729
730 /* callback when receiving binary data */
731 static void api_ws_client_on_binary(void *closure, char *data, size_t size)
732 {
733         if (size > 0) {
734                 struct readbuf rb = { .head = data, .end = data + size };
735                 switch (*rb.head++) {
736                 case 'T': /* success */
737                         api_ws_client_reply_success(closure, &rb);
738                         break;
739                 case 'F': /* fail */
740                         api_ws_client_reply_fail(closure, &rb);
741                         break;
742                 case '*': /* broadcast */
743                         api_ws_client_event_broadcast(closure, &rb);
744                         break;
745                 case '+': /* creates the event */
746                         api_ws_client_event_create(closure, &rb);
747                         break;
748                 case '-': /* drops the event */
749                         api_ws_client_event_drop(closure, &rb);
750                         break;
751                 case '!': /* pushs the event */
752                         api_ws_client_event_push(closure, &rb);
753                         break;
754                 case 'S': /* subscribe event for a request */
755                         api_ws_client_event_subscribe(closure, &rb);
756                         break;
757                 case 'U': /* unsubscribe event for a request */
758                         api_ws_client_event_unsubscribe(closure, &rb);
759                         break;
760                 default: /* unexpected message */
761                         break;
762                 }
763         }
764         free(data);
765 }
766
767 /* on call, propagate it to the ws service */
768 static void api_ws_client_xcall_cb(void * closure, struct afb_xreq *xreq)
769 {
770         int rc;
771         struct api_ws_memo *memo;
772         struct writebuf wb = { .count = 0 };
773         const char *raw;
774         size_t szraw;
775         struct api_ws *api = closure;
776
777         /* create the recording data */
778         memo = api_ws_client_memo_make(api, xreq);
779         if (memo == NULL) {
780                 afb_xreq_fail_f(xreq, "error", "out of memory");
781                 return;
782         }
783
784         /* creates the call message */
785         raw = afb_xreq_raw(xreq, &szraw);
786         if (raw == NULL)
787                 goto internal_error;
788         if (!api_ws_write_uint32(&wb, memo->msgid)
789          || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
790          || !api_ws_write_string(&wb, xreq->verb)
791          || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
792          || !api_ws_write_string_length(&wb, raw, szraw))
793                 goto overflow;
794
795         /* send */
796         rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count);
797         if (rc < 0)
798                 goto ws_send_error;
799         return;
800
801 ws_send_error:
802         afb_xreq_fail(xreq, "error", "websocket sending error");
803         goto clean_memo;
804
805 internal_error:
806         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
807         goto clean_memo;
808
809 overflow:
810         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
811
812 clean_memo:
813         api_ws_client_memo_destroy(memo);
814 }
815
816 static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
817 {
818         struct api_ws *api = closure;
819
820         /* not an error when onneed */
821         if (onneed != 0)
822                 return 0;
823
824         /* already started: it is an error */
825         ERROR("The WS binding %s is not a startable service", api->path);
826         return -1;
827 }
828
829 /*  */
830 static void api_ws_client_disconnect(struct api_ws *api)
831 {
832         if (api->fd >= 0) {
833                 afb_ws_destroy(api->client.ws);
834                 api->client.ws = NULL;
835                 close(api->fd);
836                 api->fd = -1;
837         }
838 }
839
840 /*  */
841 static int api_ws_client_connect(struct api_ws *api)
842 {
843         struct afb_ws *ws;
844         int fd;
845
846         fd = api_ws_socket(api->path, 0);
847         if (fd >= 0) {
848                 ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
849                 if (ws != NULL) {
850                         api->client.ws = ws;
851                         api->fd = fd;
852                         return 0;
853                 }
854                 close(fd);
855         }
856         return -1;
857 }
858
859 /* adds a afb-ws-service client api */
860 int afb_api_ws_add_client(const char *path)
861 {
862         int rc;
863         struct api_ws *api;
864         struct afb_api afb_api;
865
866         /* create the ws client api */
867         api = api_ws_make(path);
868         if (api == NULL)
869                 goto error;
870
871         /* connect to the service */
872         rc = api_ws_client_connect(api);
873         if (rc < 0) {
874                 ERROR("can't connect to ws service %s", api->path);
875                 goto error2;
876         }
877
878         /* record it as an API */
879         afb_api.closure = api;
880         afb_api.xcall = api_ws_client_xcall_cb;
881         afb_api.service_start = api_ws_service_start_cb;
882         if (afb_apis_add(api->api, afb_api) < 0)
883                 goto error3;
884
885         return 0;
886
887 error3:
888         api_ws_client_disconnect(api);
889 error2:
890         free(api);
891 error:
892         return -1;
893 }
894
895 /******************* client description part for server *****************************/
896
897 static void api_ws_server_client_unref(struct api_ws_client *client)
898 {
899         if (!--client->refcount) {
900                 afb_evt_listener_unref(client->listener);
901                 afb_ws_destroy(client->ws);
902                 free(client);
903         }
904 }
905
906 /* on call, propagate it to the ws service */
907 static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size)
908 {
909         struct api_ws_server_req *wreq;
910         const char *uuid, *verb;
911         uint32_t flags;
912
913         client->refcount++;
914
915         /* create the request */
916         wreq = calloc(1 , sizeof *wreq);
917         if (wreq == NULL)
918                 goto out_of_memory;
919
920         wreq->client = client;
921         wreq->rcvdata = data;
922
923         /* reads the call message data */
924         if (!api_ws_read_uint32(rb, &wreq->msgid)
925          || !api_ws_read_uint32(rb, &flags)
926          || !api_ws_read_string(rb, &verb, NULL)
927          || !api_ws_read_string(rb, &uuid, NULL)
928          || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
929                 goto overflow;
930
931         wreq->xreq.json = json_tokener_parse(wreq->request);
932         if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) {
933                 wreq->xreq.json = json_object_new_string(wreq->request);
934         }
935
936         /* init the context */
937         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
938                 goto out_of_memory;
939         wreq->xreq.context.flags = flags;
940
941         /* makes the call */
942         wreq->xreq.refcount = 1;
943         wreq->xreq.api = client->api;
944         wreq->xreq.verb = verb;
945         wreq->xreq.query = wreq;
946         wreq->xreq.queryitf = &afb_api_ws_xreq_itf;
947         afb_apis_xcall(&wreq->xreq);
948         afb_xreq_unref(&wreq->xreq);
949         return;
950
951 out_of_memory:
952 overflow:
953         free(wreq);
954         free(data);
955         api_ws_server_client_unref(client);
956 }
957
958 /* callback when receiving binary data */
959 static void api_ws_server_on_binary(void *closure, char *data, size_t size)
960 {
961         struct readbuf rb = { .head = data, .end = data + size };
962         api_ws_server_called(closure, &rb, data, size);
963 }
964
965 /* callback when receiving a hangup */
966 static void api_ws_server_on_hangup(void *closure)
967 {
968         struct api_ws_client *client = closure;
969
970         /* close the socket */
971         if (client->fd >= 0) {
972                 close(client->fd);
973                 client->fd = -1;
974         }
975
976         /* release the client */
977         api_ws_server_client_unref(client);
978 }
979
980 static void api_ws_server_accept(struct api_ws *api)
981 {
982         struct api_ws_client *client;
983         struct sockaddr addr;
984         socklen_t lenaddr;
985
986         client = calloc(1, sizeof *client);
987         if (client != NULL) {
988                 client->listener = afb_evt_listener_create(&api_ws_server_evt_itf, client);
989                 if (client->listener != NULL) {
990                         lenaddr = (socklen_t)sizeof addr;
991                         client->fd = accept(api->fd, &addr, &lenaddr);
992                         if (client->fd >= 0) {
993                                 fcntl(client->fd, F_SETFD, FD_CLOEXEC);
994                                 fcntl(client->fd, F_SETFL, O_NONBLOCK);
995                                 client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
996                                 if (client->ws != NULL) {
997                                         client->api = api->api;
998                                         client->refcount = 1;
999                                         return;
1000                                 }
1001                                 close(client->fd);
1002                         }
1003                         afb_evt_listener_unref(client->listener);
1004                 }
1005                 free(client);
1006         }
1007 }
1008
1009 /******************* server part: manage events **********************************/
1010
1011 static void api_ws_server_event_send(struct api_ws_client *client, char order, const char *event, int eventid, const char *data)
1012 {
1013         int rc;
1014         struct writebuf wb = { .count = 0 };
1015
1016         if (api_ws_write_char(&wb, order)
1017          && api_ws_write_uint32(&wb, eventid)
1018          && api_ws_write_string(&wb, event)
1019          && (data == NULL || api_ws_write_string(&wb, data))) {
1020                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1021                 if (rc >= 0)
1022                         return;
1023         }
1024         ERROR("error while sending %c for event %s", order, event);
1025 }
1026
1027 static void api_ws_server_event_add(void *closure, const char *event, int eventid)
1028 {
1029         api_ws_server_event_send(closure, '+', event, eventid, NULL);
1030 }
1031
1032 static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
1033 {
1034         api_ws_server_event_send(closure, '-', event, eventid, NULL);
1035 }
1036
1037 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1038 {
1039         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1040         api_ws_server_event_send(closure, '!', event, eventid, data ? : "null");
1041         json_object_put(object);
1042 }
1043
1044 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1045 {
1046         int rc;
1047         struct api_ws_client *client = closure;
1048
1049         struct writebuf wb = { .count = 0 };
1050
1051         if (api_ws_write_char(&wb, '*') && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
1052                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1053                 if (rc < 0)
1054                         ERROR("error while broadcasting event %s", event);
1055         } else
1056                 ERROR("error while broadcasting event %s", event);
1057         json_object_put(object);
1058 }
1059
1060 /******************* ws request part for server *****************/
1061
1062 /* decrement the reference count of the request and free/release it on falling to null */
1063 static void api_ws_server_req_destroy_cb(void *closure)
1064 {
1065         struct api_ws_server_req *wreq = closure;
1066
1067         afb_context_disconnect(&wreq->xreq.context);
1068         json_object_put(wreq->xreq.json);
1069         free(wreq->rcvdata);
1070         api_ws_server_client_unref(wreq->client);
1071         free(wreq);
1072 }
1073
1074 static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info)
1075 {
1076         int rc;
1077         struct writebuf wb = { .count = 0 };
1078         struct api_ws_server_req *wreq = closure;
1079
1080         if (api_ws_write_char(&wb, 'T')
1081          && api_ws_write_uint32(&wb, wreq->msgid)
1082          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1083          && api_ws_write_string(&wb, info ? : "")
1084          && api_ws_write_object(&wb, obj)) {
1085                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1086                 if (rc >= 0)
1087                         goto success;
1088         }
1089         ERROR("error while sending success");
1090 success:
1091         json_object_put(obj);
1092 }
1093
1094 static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info)
1095 {
1096         int rc;
1097         struct writebuf wb = { .count = 0 };
1098         struct api_ws_server_req *wreq = closure;
1099
1100         if (api_ws_write_char(&wb, 'F')
1101          && api_ws_write_uint32(&wb, wreq->msgid)
1102          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1103          && api_ws_write_string(&wb, status)
1104          && api_ws_write_string(&wb, info ? : "")) {
1105                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1106                 if (rc >= 0)
1107                         return;
1108         }
1109         ERROR("error while sending fail");
1110 }
1111
1112 static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event)
1113 {
1114         int rc, rc2;
1115         struct writebuf wb = { .count = 0 };
1116         struct api_ws_server_req *wreq = closure;
1117
1118         rc = afb_evt_add_watch(wreq->client->listener, event);
1119         if (rc < 0)
1120                 return rc;
1121
1122         if (api_ws_write_char(&wb, 'S')
1123          && api_ws_write_uint32(&wb, wreq->msgid)
1124          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1125          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1126                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1127                 if (rc2 >= 0)
1128                         goto success;
1129         }
1130         ERROR("error while subscribing event");
1131 success:
1132         return rc;
1133 }
1134
1135 static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event)
1136 {
1137         int rc, rc2;
1138         struct writebuf wb = { .count = 0 };
1139         struct api_ws_server_req *wreq = closure;
1140
1141         if (api_ws_write_char(&wb, 'U')
1142          && api_ws_write_uint32(&wb, wreq->msgid)
1143          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1144          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1145                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1146                 if (rc2 >= 0)
1147                         goto success;
1148         }
1149         ERROR("error while subscribing event");
1150 success:
1151         rc = afb_evt_remove_watch(wreq->client->listener, event);
1152         return rc;
1153 }
1154
1155 /******************* server part **********************************/
1156
1157 static int api_ws_server_connect(struct api_ws *api);
1158
1159 static int api_ws_server_listen_callback(sd_event_source *src, int fd, uint32_t revents, void *closure)
1160 {
1161         if ((revents & EPOLLIN) != 0)
1162                 api_ws_server_accept(closure);
1163         if ((revents & EPOLLHUP) != 0)
1164                 api_ws_server_connect(closure);
1165         return 0;
1166 }
1167
1168 static void api_ws_server_disconnect(struct api_ws *api)
1169 {
1170         if (api->server.listensrc != NULL) {
1171                 sd_event_source_unref(api->server.listensrc);
1172                 api->server.listensrc = NULL;
1173         }
1174         if (api->fd >= 0) {
1175                 close(api->fd);
1176                 api->fd = -1;
1177         }
1178 }
1179
1180 static int api_ws_server_connect(struct api_ws *api)
1181 {
1182         int rc;
1183
1184         /* ensure disconnected */
1185         api_ws_server_disconnect(api);
1186
1187         /* request the service object name */
1188         api->fd = api_ws_socket(api->path, 1);
1189         if (api->fd < 0)
1190                 ERROR("can't create socket %s", api->path);
1191         else {
1192                 /* listen for service */
1193                 rc = sd_event_add_io(afb_common_get_event_loop(), &api->server.listensrc, api->fd, EPOLLIN, api_ws_server_listen_callback, api);
1194                 if (rc >= 0)
1195                         return 0;
1196                 close(api->fd);
1197                 errno = -rc;
1198                 ERROR("can't add ws object %s", api->path);
1199         }
1200         return -1;
1201 }
1202
1203 /* create the service */
1204 int afb_api_ws_add_server(const char *path)
1205 {
1206         int rc;
1207         struct api_ws *api;
1208
1209         /* creates the ws api object */
1210         api = api_ws_make(path);
1211         if (api == NULL)
1212                 goto error;
1213
1214         /* connect for serving */
1215         rc = api_ws_server_connect(api);
1216         if (rc < 0)
1217                 goto error2;
1218
1219         return 0;
1220
1221 error2:
1222         free(api);
1223 error:
1224         return -1;
1225 }
1226
1227