Implement subcall for services over websockets
[src/app-framework-binder.git] / src / afb-api-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_PLUGIN_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <assert.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <endian.h>
28 #include <netdb.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32 #include <pthread.h>
33
34 #include <json-c/json.h>
35 #include <systemd/sd-event.h>
36
37 #include <afb/afb-req-itf.h>
38
39 #include "afb-common.h"
40
41 #include "afb-session.h"
42 #include "afb-cred.h"
43 #include "afb-ws.h"
44 #include "afb-msg-json.h"
45 #include "afb-apis.h"
46 #include "afb-api-so.h"
47 #include "afb-context.h"
48 #include "afb-evt.h"
49 #include "afb-xreq.h"
50 #include "verbose.h"
51 #include "sd-fds.h"
52
53 struct api_ws_memo;
54 struct api_ws_event;
55 struct api_ws_client;
56
57 #define CHAR_FOR_CALL             'C'
58 #define CHAR_FOR_ANSWER_SUCCESS   'T'
59 #define CHAR_FOR_ANSWER_FAIL      'F'
60 #define CHAR_FOR_EVT_BROADCAST    '*'
61 #define CHAR_FOR_EVT_ADD          '+'
62 #define CHAR_FOR_EVT_DEL          '-'
63 #define CHAR_FOR_EVT_PUSH         '!'
64 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
65 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
66 #define CHAR_FOR_SUBCALL_CALL     'B'
67 #define CHAR_FOR_SUBCALL_REPLY    'R'
68
69 /*
70  */
71 struct api_ws
72 {
73         char *path;             /* path of the object for the API */
74         char *api;              /* api name of the interface */
75         int fd;                 /* file descriptor */
76         pthread_mutex_t mutex;  /**< resource control */
77         union {
78                 struct {
79                         uint32_t id;
80                         struct afb_ws *ws;
81                         struct api_ws_event *events;
82                         struct api_ws_memo *memos;
83                 } client;
84                 struct {
85                         sd_event_source *listensrc; /**< systemd source for server socket */
86                 } server;
87         };
88 };
89
90 #define RETOK   1
91 #define RETERR  2
92 #define RETRAW  3
93
94 /******************* websocket interface for client part **********************************/
95
96 static void api_ws_client_on_binary(void *closure, char *data, size_t size);
97
98 static const struct afb_ws_itf api_ws_client_ws_itf =
99 {
100         .on_close = NULL,
101         .on_text = NULL,
102         .on_binary = api_ws_client_on_binary,
103         .on_error = NULL,
104         .on_hangup = NULL
105 };
106
107 /******************* event structures for server part **********************************/
108
109 static void api_ws_server_event_add(void *closure, const char *event, int eventid);
110 static void api_ws_server_event_remove(void *closure, const char *event, int eventid);
111 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
112 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
113
114 /* the interface for events pushing */
115 static const struct afb_evt_itf api_ws_server_evt_itf = {
116         .broadcast = api_ws_server_event_broadcast,
117         .push = api_ws_server_event_push,
118         .add = api_ws_server_event_add,
119         .remove = api_ws_server_event_remove
120 };
121
122 /******************* handling subcalls *****************************/
123
124 /**
125  * Structure on server side for recording pending
126  * subcalls.
127  */
128 struct api_ws_subcall
129 {
130         struct api_ws_subcall *next;    /**< next subcall for the client */
131         uint32_t subcallid;             /**< the subcallid */
132         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
133         void *closure;                  /**< closure of the callback */
134 };
135
136 /**
137  * Structure for sending back replies on client side
138  */
139 struct api_ws_reply
140 {
141         struct api_ws *apiws;   /**< api descriptor */
142         uint32_t subcallid;     /**< subcallid for the reply */
143 };
144
145 /******************* client description part for server *****************************/
146
147 struct api_ws_client
148 {
149         /* the server ws-api */
150         const char *api;
151
152         /* count of references */
153         int refcount;
154
155         /* file descriptor */
156         int fd;
157
158         /* resource control */
159         pthread_mutex_t mutex;
160
161         /* listener for events */
162         struct afb_evt_listener *listener;
163
164         /* websocket */
165         struct afb_ws *ws;
166
167         /* credentials */
168         struct afb_cred *cred;
169
170         /* pending subcalls */
171         struct api_ws_subcall *subcalls;
172 };
173
174 /******************* websocket interface for client part **********************************/
175
176 static void api_ws_server_on_binary(void *closure, char *data, size_t size);
177 static void api_ws_server_on_hangup(void *closure);
178
179 static const struct afb_ws_itf api_ws_server_ws_itf =
180 {
181         .on_close = NULL,
182         .on_text = NULL,
183         .on_binary = api_ws_server_on_binary,
184         .on_error = NULL,
185         .on_hangup = api_ws_server_on_hangup
186 };
187
188 /******************* ws request part for server *****************/
189
190 /*
191  * structure for a ws request
192  */
193 struct api_ws_server_req {
194         struct afb_xreq xreq;           /* the xreq */
195         struct api_ws_client *client;   /* the client of the request */
196         uint32_t msgid;                 /* the incoming request msgid */
197 };
198
199 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info);
200 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info);
201 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
202 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
203 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
204 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
205
206 const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
207         .success = api_ws_server_req_success_cb,
208         .fail = api_ws_server_req_fail_cb,
209         .unref = api_ws_server_req_destroy_cb,
210         .subcall = api_ws_server_req_subcall_cb,
211         .subscribe = api_ws_server_req_subscribe_cb,
212         .unsubscribe = api_ws_server_req_unsubscribe_cb
213 };
214
215 /******************* common part **********************************/
216
217 /*
218  * create a structure api_ws not connected to the 'path'.
219  */
220 static struct api_ws *api_ws_make(const char *path)
221 {
222         struct api_ws *api;
223         size_t length;
224
225         /* allocates the structure */
226         length = strlen(path);
227         api = calloc(1, sizeof *api + 1 + length);
228         if (api == NULL) {
229                 errno = ENOMEM;
230                 goto error;
231         }
232
233         /* path is copied after the struct */
234         api->path = (char*)(api+1);
235         memcpy(api->path, path, length + 1);
236
237         /* api name is at the end of the path */
238         while (length && path[length - 1] != '/' && path[length - 1] != ':')
239                 length = length - 1;
240         api->api = &api->path[length];
241         if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) {
242                 errno = EINVAL;
243                 goto error2;
244         }
245         pthread_mutex_init(&api->mutex, NULL);
246
247         api->fd = -1;
248         return api;
249
250 error2:
251         free(api);
252 error:
253         return NULL;
254 }
255
256 static int api_ws_socket_unix(const char *path, int server)
257 {
258         int fd, rc;
259         struct sockaddr_un addr;
260         size_t length;
261
262         length = strlen(path);
263         if (length >= 108) {
264                 errno = ENAMETOOLONG;
265                 return -1;
266         }
267
268         if (server)
269                 unlink(path);
270
271         fd = socket(AF_UNIX, SOCK_STREAM, 0);
272         if (fd < 0)
273                 return fd;
274
275         memset(&addr, 0, sizeof addr);
276         addr.sun_family = AF_UNIX;
277         strcpy(addr.sun_path, path);
278         if (server) {
279                 rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
280         } else {
281                 rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
282         }
283         if (rc < 0) {
284                 close(fd);
285                 return rc;
286         }
287         return fd;
288 }
289
290 static int api_ws_socket_inet(const char *path, int server)
291 {
292         int rc, fd;
293         const char *service, *host, *api;
294         struct addrinfo hint, *rai, *iai;
295
296         /* scan the uri */
297         api = strrchr(path, '/');
298         service = strrchr(path, ':');
299         if (api == NULL || service == NULL || api < service) {
300                 errno = EINVAL;
301                 return -1;
302         }
303         host = strndupa(path, service++ - path);
304         service = strndupa(service, api - service);
305
306         /* get addr */
307         memset(&hint, 0, sizeof hint);
308         hint.ai_family = AF_INET;
309         hint.ai_socktype = SOCK_STREAM;
310         rc = getaddrinfo(host, service, &hint, &rai);
311         if (rc != 0) {
312                 errno = EINVAL;
313                 return -1;
314         }
315
316         /* get the socket */
317         iai = rai;
318         while (iai != NULL) {
319                 fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol);
320                 if (fd >= 0) {
321                         if (server) {
322                                 rc = bind(fd, iai->ai_addr, iai->ai_addrlen);
323                         } else {
324                                 rc = connect(fd, iai->ai_addr, iai->ai_addrlen);
325                         }
326                         if (rc == 0) {
327                                 freeaddrinfo(rai);
328                                 return fd;
329                         }
330                         close(fd);
331                 }
332                 iai = iai->ai_next;
333         }
334         freeaddrinfo(rai);
335         return -1;
336         
337 }
338
339 static int api_ws_socket(const char *path, int server)
340 {
341         int fd, rc;
342
343         /* check for systemd socket */
344         if (0 == strncmp(path, "sd:", 3))
345                 fd = sd_fds_for(path + 3);
346         else {
347                 /* check for unix socket */
348                 if (0 == strncmp(path, "unix:", 5))
349                         /* unix socket */
350                         fd = api_ws_socket_unix(path + 5, server);
351                 else
352                         /* inet socket */
353                         fd = api_ws_socket_inet(path, server);
354
355                 if (fd >= 0 && server) {
356                         rc = 1;
357                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc);
358                         rc = listen(fd, 5);
359                 }
360         }
361         /* configure the socket */
362         if (fd >= 0) {
363                 fcntl(fd, F_SETFD, FD_CLOEXEC);
364                 fcntl(fd, F_SETFL, O_NONBLOCK);
365         }
366         return fd;
367 }
368
369 /******************* serialisation part **********************************/
370
371 struct readbuf
372 {
373         char *head, *end;
374 };
375
376 #define WRITEBUF_COUNT_MAX  32
377 struct writebuf
378 {
379         struct iovec iovec[WRITEBUF_COUNT_MAX];
380         uint32_t uints[WRITEBUF_COUNT_MAX];
381         int count;
382 };
383
384 static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
385 {
386         char *before = rb->head;
387         char *after = before + length;
388         if (after > rb->end)
389                 return 0;
390         rb->head = after;
391         return before;
392 }
393
394 static int api_ws_read_char(struct readbuf *rb, char *value)
395 {
396         if (rb->head >= rb->end)
397                 return 0;
398         *value = *rb->head++;
399         return 1;
400 }
401
402 static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
403 {
404         char *after = rb->head + sizeof *value;
405         if (after > rb->end)
406                 return 0;
407         memcpy(value, rb->head, sizeof *value);
408         rb->head = after;
409         *value = le32toh(*value);
410         return 1;
411 }
412
413 static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
414 {
415         uint32_t len;
416         if (!api_ws_read_uint32(rb, &len) || !len)
417                 return 0;
418         if (length)
419                 *length = (size_t)(len - 1);
420         return (*value = api_ws_read_get(rb, len)) != NULL &&  rb->head[-1] == 0;
421 }
422
423 static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
424 {
425         const char *string;
426         struct json_object *o;
427         int rc = api_ws_read_string(rb, &string, NULL);
428         if (rc) {
429                 o = json_tokener_parse(string);
430                 if (o == NULL && strcmp(string, "null"))
431                         o = json_object_new_string(string);
432                 *object = o;
433         }
434         return rc;
435 }
436
437 static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
438 {
439         int i = wb->count;
440         if (i == WRITEBUF_COUNT_MAX)
441                 return 0;
442         wb->iovec[i].iov_base = (void*)value;
443         wb->iovec[i].iov_len = length;
444         wb->count = i + 1;
445         return 1;
446 }
447
448 static int api_ws_write_char(struct writebuf *wb, char value)
449 {
450         int i = wb->count;
451         if (i == WRITEBUF_COUNT_MAX)
452                 return 0;
453         *(char*)&wb->uints[i] = value;
454         wb->iovec[i].iov_base = &wb->uints[i];
455         wb->iovec[i].iov_len = 1;
456         wb->count = i + 1;
457         return 1;
458 }
459
460 static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
461 {
462         int i = wb->count;
463         if (i == WRITEBUF_COUNT_MAX)
464                 return 0;
465         wb->uints[i] = htole32(value);
466         wb->iovec[i].iov_base = &wb->uints[i];
467         wb->iovec[i].iov_len = sizeof wb->uints[i];
468         wb->count = i + 1;
469         return 1;
470 }
471
472 static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
473 {
474         uint32_t len = (uint32_t)++length;
475         return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
476 }
477
478 static int api_ws_write_string(struct writebuf *wb, const char *value)
479 {
480         return api_ws_write_string_length(wb, value, strlen(value));
481 }
482
483 static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
484 {
485         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
486         return string != NULL && api_ws_write_string(wb, string);
487 }
488
489 /******************* client part **********************************/
490
491 /*
492  * structure for recording query data
493  */
494 struct api_ws_memo {
495         struct api_ws_memo *next;       /* the next memo */
496         struct api_ws *api;             /* the ws api */
497         struct afb_xreq *xreq;          /* the request handle */
498         uint32_t msgid;                 /* the message identifier */
499 };
500
501 struct api_ws_event
502 {
503         struct api_ws_event *next;
504         struct afb_event event;
505         int eventid;
506         int refcount;
507 };
508
509 /* search a memorized request */
510 static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
511 {
512         struct api_ws_memo *memo;
513
514         memo = api->client.memos;
515         while (memo != NULL && memo->msgid != msgid)
516                 memo = memo->next;
517
518         return memo;
519 }
520
521 /* search the event */
522 static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
523 {
524         struct api_ws_event *ev;
525
526         ev = api->client.events;
527         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
528                 ev = ev->next;
529
530         return ev;
531 }
532
533
534 /* allocates and init the memorizing data */
535 static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
536 {
537         struct api_ws_memo *memo;
538
539         memo = malloc(sizeof *memo);
540         if (memo != NULL) {
541                 afb_xreq_addref(xreq);
542                 memo->xreq = xreq;
543                 do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
544                 memo->api = api;
545                 memo->next = api->client.memos;
546                 api->client.memos = memo;
547         }
548         return memo;
549 }
550
551 /* free and release the memorizing data */
552 static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
553 {
554         struct api_ws_memo **prv;
555
556         prv = &memo->api->client.memos;
557         while (*prv != NULL) {
558                 if (*prv == memo) {
559                         *prv = memo->next;
560                         break;
561                 }
562                 prv = &(*prv)->next;
563         }
564
565         afb_xreq_unref(memo->xreq);
566         free(memo);
567 }
568
569 /* get event data from the message */
570 static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
571 {
572         return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
573 }
574
575 /* get event from the message */
576 static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
577 {
578         const char *name;
579         uint32_t eventid;
580
581         /* get event data from the message */
582         if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
583                 ERROR("Invalid message");
584                 return 0;
585         }
586
587         /* check conflicts */
588         *ev = api_ws_client_event_search(api, eventid, name);
589         if (*ev == NULL) {
590                 ERROR("event %s not found", name);
591                 return 0;
592         }
593
594         return 1;
595 }
596
597 /* get event from the message */
598 static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
599 {
600         uint32_t msgid;
601
602         /* get event data from the message */
603         if (!api_ws_read_uint32(rb, &msgid)) {
604                 ERROR("Invalid message");
605                 return 0;
606         }
607
608         /* get the memo */
609         *memo = api_ws_client_memo_search(api, msgid);
610         if (*memo == NULL) {
611                 ERROR("message not found");
612                 return 0;
613         }
614
615         return 1;
616 }
617
618 /* read a subscrition message */
619 static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
620 {
621         return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
622 }
623
624 /* adds an event */
625 static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
626 {
627         size_t offset;
628         const char *name;
629         uint32_t eventid;
630         struct api_ws_event *ev;
631
632         /* get event data from the message */
633         offset = api_ws_client_msg_event_read(rb, &eventid, &name);
634         if (offset == 0) {
635                 ERROR("Invalid message");
636                 return;
637         }
638
639         /* check conflicts */
640         ev = api_ws_client_event_search(api, eventid, name);
641         if (ev != NULL) {
642                 ev->refcount++;
643                 return;
644         }
645
646         /* no conflict, try to add it */
647         ev = malloc(sizeof *ev);
648         if (ev != NULL) {
649                 ev->event = afb_evt_create_event(name);
650                 if (ev->event.closure == NULL)
651                         free(ev);
652                 else {
653                         ev->refcount = 1;
654                         ev->eventid = eventid;
655                         ev->next = api->client.events;
656                         api->client.events = ev;
657                         return;
658                 }
659         }
660         ERROR("can't create event %s, out of memory", name);
661 }
662
663 /* removes an event */
664 static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
665 {
666         struct api_ws_event *ev, **prv;
667
668         /* retrieves the event */
669         if (!api_ws_client_msg_event_get(api, rb, &ev))
670                 return;
671
672         /* decrease the reference count */
673         if (--ev->refcount)
674                 return;
675
676         /* unlinks the event */
677         prv = &api->client.events;
678         while (*prv != ev)
679                 prv = &(*prv)->next;
680         *prv = ev->next;
681
682         /* destroys the event */
683         afb_event_drop(ev->event);
684         free(ev);
685 }
686
687 /* subscribes an event */
688 static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
689 {
690         struct api_ws_event *ev;
691         struct api_ws_memo *memo;
692
693         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
694                 /* subscribe the request from the event */
695                 if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
696                         ERROR("can't subscribe: %m");
697         }
698 }
699
700 /* unsubscribes an event */
701 static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
702 {
703         struct api_ws_event *ev;
704         struct api_ws_memo *memo;
705
706         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
707                 /* unsubscribe the request from the event */
708                 if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
709                         ERROR("can't unsubscribe: %m");
710         }
711 }
712
713 /* receives broadcasted events */
714 static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
715 {
716         struct json_object *object;
717         const char *event;
718
719         if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
720                 afb_evt_broadcast(event, object);
721         else
722                 ERROR("unreadable broadcasted event");
723 }
724
725 /* pushs an event */
726 static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
727 {
728         struct api_ws_event *ev;
729         struct json_object *object;
730
731         if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
732                 afb_event_push(ev->event, object);
733         else
734                 ERROR("unreadable push event");
735 }
736
737 static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
738 {
739         struct api_ws_memo *memo;
740         struct json_object *object;
741         const char *info;
742         uint32_t flags;
743
744         /* retrieve the message data */
745         if (!api_ws_client_msg_memo_get(api, rb, &memo))
746                 return;
747
748         if (api_ws_read_uint32(rb, &flags)
749          && api_ws_read_string(rb, &info, NULL)
750          && api_ws_read_object(rb, &object)) {
751                 memo->xreq->context.flags = (unsigned)flags;
752                 afb_xreq_success(memo->xreq, object, *info ? info : NULL);
753         } else {
754                 /* failing to have the answer */
755                 afb_xreq_fail(memo->xreq, "error", "ws error");
756         }
757         api_ws_client_memo_destroy(memo);
758 }
759
760 static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
761 {
762         struct api_ws_memo *memo;
763         const char *info, *status;
764         uint32_t flags;
765
766         /* retrieve the message data */
767         if (!api_ws_client_msg_memo_get(api, rb, &memo))
768                 return;
769
770         if (api_ws_read_uint32(rb, &flags)
771          && api_ws_read_string(rb, &status, NULL)
772          && api_ws_read_string(rb, &info, NULL)) {
773                 memo->xreq->context.flags = (unsigned)flags;
774                 afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
775         } else {
776                 /* failing to have the answer */
777                 afb_xreq_fail(memo->xreq, "error", "ws error");
778         }
779         api_ws_client_memo_destroy(memo);
780 }
781
782 /* send a subcall reply */
783 static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
784 {
785         int rc;
786         struct writebuf wb = { .count = 0 };
787         char ie = (char)!!iserror;
788
789         if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
790          || !api_ws_write_uint32(&wb, reply->subcallid)
791          || !api_ws_write_char(&wb, ie)
792          || !api_ws_write_object(&wb, object)) {
793                 /* write error ? */
794                 return;
795         }
796
797         rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
798         if (rc >= 0)
799                 return;
800         ERROR("error while sending subcall reply");
801 }
802
803 /* callback for subcall reply */
804 static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
805 {
806         api_ws_client_send_subcall_reply(closure, iserror, object);
807         free(closure);
808 }
809
810 /* received a subcall request */
811 static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
812 {
813         struct api_ws_reply *reply;
814         struct api_ws_memo *memo;
815         const char *api, *verb;
816         uint32_t subcallid;
817         struct json_object *object;
818
819         reply = malloc(sizeof *reply);
820         if (!reply)
821                 return;
822
823         /* retrieve the message data */
824         if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
825                 return;
826
827         if (api_ws_read_uint32(rb, &subcallid)
828          && api_ws_read_string(rb, &api, NULL)
829          && api_ws_read_string(rb, &verb, NULL)
830          && api_ws_read_object(rb, &object)) {
831                 reply->apiws = apiws;
832                 reply->subcallid = subcallid;
833                 afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
834         }
835 }
836
837 /* callback when receiving binary data */
838 static void api_ws_client_on_binary(void *closure, char *data, size_t size)
839 {
840         if (size > 0) {
841                 struct api_ws *apiws = closure;
842                 struct readbuf rb = { .head = data, .end = data + size };
843
844                 pthread_mutex_lock(&apiws->mutex);
845                 switch (*rb.head++) {
846                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
847                         api_ws_client_reply_success(apiws, &rb);
848                         break;
849                 case CHAR_FOR_ANSWER_FAIL: /* fail */
850                         api_ws_client_reply_fail(apiws, &rb);
851                         break;
852                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
853                         api_ws_client_event_broadcast(apiws, &rb);
854                         break;
855                 case CHAR_FOR_EVT_ADD: /* creates the event */
856                         api_ws_client_event_create(apiws, &rb);
857                         break;
858                 case CHAR_FOR_EVT_DEL: /* drops the event */
859                         api_ws_client_event_drop(apiws, &rb);
860                         break;
861                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
862                         api_ws_client_event_push(apiws, &rb);
863                         break;
864                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
865                         api_ws_client_event_subscribe(apiws, &rb);
866                         break;
867                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
868                         api_ws_client_event_unsubscribe(apiws, &rb);
869                         break;
870                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
871                         api_ws_client_subcall(apiws, &rb);
872                         break;
873                 default: /* unexpected message */
874                         /* TODO: close the connection */
875                         break;
876                 }
877                 pthread_mutex_unlock(&apiws->mutex);
878         }
879         free(data);
880 }
881
882 /* on call, propagate it to the ws service */
883 static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
884 {
885         int rc;
886         struct api_ws_memo *memo;
887         struct writebuf wb = { .count = 0 };
888         const char *raw;
889         size_t szraw;
890         struct api_ws *apiws = closure;
891
892         pthread_mutex_lock(&apiws->mutex);
893
894         /* create the recording data */
895         memo = api_ws_client_memo_make(apiws, xreq);
896         if (memo == NULL) {
897                 afb_xreq_fail_f(xreq, "error", "out of memory");
898                 goto end;
899         }
900
901         /* creates the call message */
902         raw = afb_xreq_raw(xreq, &szraw);
903         if (raw == NULL)
904                 goto internal_error;
905         if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
906          || !api_ws_write_uint32(&wb, memo->msgid)
907          || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
908          || !api_ws_write_string(&wb, xreq->verb)
909          || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
910          || !api_ws_write_string_length(&wb, raw, szraw))
911                 goto overflow;
912
913         /* send */
914         rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
915         if (rc >= 0)
916                 goto end;
917
918         afb_xreq_fail(xreq, "error", "websocket sending error");
919         goto clean_memo;
920
921 internal_error:
922         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
923         goto clean_memo;
924
925 overflow:
926         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
927
928 clean_memo:
929         api_ws_client_memo_destroy(memo);
930 end:
931         pthread_mutex_unlock(&apiws->mutex);
932 }
933
934 static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
935 {
936         struct api_ws *api = closure;
937
938         /* not an error when onneed */
939         if (onneed != 0)
940                 return 0;
941
942         /* already started: it is an error */
943         ERROR("The WS binding %s is not a startable service", api->path);
944         return -1;
945 }
946
947 /*  */
948 static void api_ws_client_disconnect(struct api_ws *api)
949 {
950         if (api->fd >= 0) {
951                 afb_ws_destroy(api->client.ws);
952                 api->client.ws = NULL;
953                 close(api->fd);
954                 api->fd = -1;
955         }
956 }
957
958 /*  */
959 static int api_ws_client_connect(struct api_ws *api)
960 {
961         struct afb_ws *ws;
962         int fd;
963
964         fd = api_ws_socket(api->path, 0);
965         if (fd >= 0) {
966                 ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
967                 if (ws != NULL) {
968                         api->client.ws = ws;
969                         api->fd = fd;
970                         return 0;
971                 }
972                 close(fd);
973         }
974         return -1;
975 }
976
977 static struct afb_api_itf ws_api_itf = {
978         .call = api_ws_client_call_cb,
979         .service_start = api_ws_service_start_cb
980 };
981
982 /* adds a afb-ws-service client api */
983 int afb_api_ws_add_client(const char *path)
984 {
985         int rc;
986         struct api_ws *api;
987         struct afb_api afb_api;
988
989         /* create the ws client api */
990         api = api_ws_make(path);
991         if (api == NULL)
992                 goto error;
993
994         /* connect to the service */
995         rc = api_ws_client_connect(api);
996         if (rc < 0) {
997                 ERROR("can't connect to ws service %s", api->path);
998                 goto error2;
999         }
1000
1001         /* record it as an API */
1002         afb_api.closure = api;
1003         afb_api.itf = &ws_api_itf;
1004         if (afb_apis_add(api->api, afb_api) < 0)
1005                 goto error3;
1006
1007         return 0;
1008
1009 error3:
1010         api_ws_client_disconnect(api);
1011 error2:
1012         free(api);
1013 error:
1014         return -1;
1015 }
1016
1017 /******************* client description part for server *****************************/
1018
1019 static void api_ws_server_client_unref(struct api_ws_client *client)
1020 {
1021         struct api_ws_subcall *sc, *nsc;
1022
1023         if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
1024                 afb_evt_listener_unref(client->listener);
1025                 afb_ws_destroy(client->ws);
1026                 nsc = client->subcalls;
1027                 while (nsc) {
1028                         sc= nsc;
1029                         nsc = sc->next;
1030                         sc->callback(sc->closure, 1, NULL);
1031                         free(sc);
1032                 }
1033                 afb_cred_unref(client->cred);
1034                 free(client);
1035         }
1036 }
1037
1038 static void api_ws_server_client_addref(struct api_ws_client *client)
1039 {
1040         __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
1041 }
1042
1043 /* on call, propagate it to the ws service */
1044 static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
1045 {
1046         struct api_ws_server_req *wreq;
1047         char *cverb;
1048         const char *uuid, *verb;
1049         uint32_t flags, msgid;
1050         size_t lenverb;
1051         struct json_object *object;
1052
1053         api_ws_server_client_addref(client);
1054
1055         /* reads the call message data */
1056         if (!api_ws_read_uint32(rb, &msgid)
1057          || !api_ws_read_uint32(rb, &flags)
1058          || !api_ws_read_string(rb, &verb, &lenverb)
1059          || !api_ws_read_string(rb, &uuid, NULL)
1060          || !api_ws_read_object(rb, &object))
1061                 goto overflow;
1062
1063         /* create the request */
1064         wreq = malloc(++lenverb + sizeof *wreq);
1065         if (wreq == NULL)
1066                 goto out_of_memory;
1067
1068         afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
1069         wreq->client = client;
1070         wreq->msgid = msgid;
1071         cverb = (char*)&wreq[1];
1072         memcpy(cverb, verb, lenverb);
1073
1074         /* init the context */
1075         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
1076                 goto unconnected;
1077         wreq->xreq.context.flags = flags;
1078
1079         /* makes the call */
1080         wreq->xreq.cred = afb_cred_addref(client->cred);
1081         wreq->xreq.api = client->api;
1082         wreq->xreq.verb = cverb;
1083         wreq->xreq.json = object;
1084         afb_apis_call(&wreq->xreq);
1085         afb_xreq_unref(&wreq->xreq);
1086         return;
1087
1088 unconnected:
1089         free(wreq);
1090 out_of_memory:
1091         json_object_put(object);
1092 overflow:
1093         api_ws_server_client_unref(client);
1094 }
1095
1096 /* on subcall reply */
1097 static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
1098 {
1099         char iserror;
1100         uint32_t subcallid;
1101         struct json_object *object;
1102         struct api_ws_subcall *sc, **psc;
1103
1104         /* reads the call message data */
1105         if (!api_ws_read_uint32(rb, &subcallid)
1106          || !api_ws_read_char(rb, &iserror)
1107          || !api_ws_read_object(rb, &object)) {
1108                 /* TODO bad protocol */
1109                 return;
1110         }
1111
1112         /* search the subcall and unlink it */
1113         pthread_mutex_lock(&client->mutex);
1114         psc = &client->subcalls;
1115         while ((sc = *psc) && sc->subcallid != subcallid)
1116                 psc = &sc->next;
1117         if (!sc) {
1118                 pthread_mutex_unlock(&client->mutex);
1119                 /* TODO subcall not found */
1120         } else {
1121                 *psc = sc->next;
1122                 pthread_mutex_unlock(&client->mutex);
1123                 sc->callback(sc->closure, (int)iserror, object);
1124                 free(sc);
1125         }
1126         json_object_put(object);
1127 }
1128
1129 /* callback when receiving binary data */
1130 static void api_ws_server_on_binary(void *closure, char *data, size_t size)
1131 {
1132         if (size > 0) {
1133                 struct readbuf rb = { .head = data, .end = data + size };
1134                 switch (*rb.head++) {
1135                 case CHAR_FOR_CALL:
1136                         api_ws_server_on_call(closure, &rb);
1137                         break;
1138                 case CHAR_FOR_SUBCALL_REPLY:
1139                         api_ws_server_on_subcall_reply(closure, &rb);
1140                         break;
1141                 default: /* unexpected message */
1142                         /* TODO: close the connection */
1143                         break;
1144                 }
1145         }
1146         free(data);
1147 }
1148
1149 /* callback when receiving a hangup */
1150 static void api_ws_server_on_hangup(void *closure)
1151 {
1152         struct api_ws_client *client = closure;
1153
1154         /* close the socket */
1155         if (client->fd >= 0) {
1156                 close(client->fd);
1157                 client->fd = -1;
1158         }
1159
1160         /* release the client */
1161         api_ws_server_client_unref(client);
1162 }
1163
1164 static void api_ws_server_accept(struct api_ws *api)
1165 {
1166         struct api_ws_client *client;
1167         struct sockaddr addr;
1168         socklen_t lenaddr;
1169
1170         client = calloc(1, sizeof *client);
1171         if (client != NULL) {
1172                 client->listener = afb_evt_listener_create(&api_ws_server_evt_itf, client);
1173                 if (client->listener != NULL) {
1174                         lenaddr = (socklen_t)sizeof addr;
1175                         client->fd = accept(api->fd, &addr, &lenaddr);
1176                         if (client->fd >= 0) {
1177                                 client->cred = afb_cred_create_for_socket(client->fd);
1178                                 fcntl(client->fd, F_SETFD, FD_CLOEXEC);
1179                                 fcntl(client->fd, F_SETFL, O_NONBLOCK);
1180                                 client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
1181                                 if (client->ws != NULL) {
1182                                         client->api = api->api;
1183                                         client->refcount = 1;
1184                                         client->subcalls = NULL;
1185                                         return;
1186                                 }
1187                                 afb_cred_unref(client->cred);
1188                                 close(client->fd);
1189                         }
1190                         afb_evt_listener_unref(client->listener);
1191                 }
1192                 free(client);
1193         }
1194 }
1195
1196 /******************* server part: manage events **********************************/
1197
1198 static void api_ws_server_event_send(struct api_ws_client *client, char order, const char *event, int eventid, const char *data)
1199 {
1200         int rc;
1201         struct writebuf wb = { .count = 0 };
1202
1203         if (api_ws_write_char(&wb, order)
1204          && api_ws_write_uint32(&wb, eventid)
1205          && api_ws_write_string(&wb, event)
1206          && (data == NULL || api_ws_write_string(&wb, data))) {
1207                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1208                 if (rc >= 0)
1209                         return;
1210         }
1211         ERROR("error while sending %c for event %s", order, event);
1212 }
1213
1214 static void api_ws_server_event_add(void *closure, const char *event, int eventid)
1215 {
1216         api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
1217 }
1218
1219 static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
1220 {
1221         api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1222 }
1223
1224 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1225 {
1226         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1227         api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1228         json_object_put(object);
1229 }
1230
1231 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1232 {
1233         int rc;
1234         struct api_ws_client *client = closure;
1235
1236         struct writebuf wb = { .count = 0 };
1237
1238         if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
1239                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1240                 if (rc < 0)
1241                         ERROR("error while broadcasting event %s", event);
1242         } else
1243                 ERROR("error while broadcasting event %s", event);
1244         json_object_put(object);
1245 }
1246
1247 /******************* ws request part for server *****************/
1248
1249 /* decrement the reference count of the request and free/release it on falling to null */
1250 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq)
1251 {
1252         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1253
1254         afb_context_disconnect(&wreq->xreq.context);
1255         afb_cred_unref(wreq->xreq.cred);
1256         json_object_put(wreq->xreq.json);
1257         api_ws_server_client_unref(wreq->client);
1258         free(wreq);
1259 }
1260
1261 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
1262 {
1263         int rc;
1264         struct writebuf wb = { .count = 0 };
1265         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1266
1267         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
1268          && api_ws_write_uint32(&wb, wreq->msgid)
1269          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1270          && api_ws_write_string(&wb, info ? : "")
1271          && api_ws_write_object(&wb, obj)) {
1272                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1273                 if (rc >= 0)
1274                         goto success;
1275         }
1276         ERROR("error while sending success");
1277 success:
1278         json_object_put(obj);
1279 }
1280
1281 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
1282 {
1283         int rc;
1284         struct writebuf wb = { .count = 0 };
1285         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1286
1287         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL)
1288          && api_ws_write_uint32(&wb, wreq->msgid)
1289          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1290          && api_ws_write_string(&wb, status)
1291          && api_ws_write_string(&wb, info ? : "")) {
1292                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1293                 if (rc >= 0)
1294                         return;
1295         }
1296         ERROR("error while sending fail");
1297 }
1298
1299 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
1300 {
1301         int rc;
1302         struct writebuf wb = { .count = 0 };
1303         struct api_ws_subcall *sc, *osc;
1304         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1305         struct api_ws_client *client = wreq->client;
1306
1307         sc = malloc(sizeof *sc);
1308         if (!sc) {
1309
1310         } else {
1311                 sc->callback = callback;
1312                 sc->closure = cb_closure;
1313
1314                 pthread_mutex_unlock(&client->mutex);
1315                 sc->subcallid = (uint32_t)(((intptr_t)sc) >> 6);
1316                 do {
1317                         sc->subcallid++;
1318                         osc = client->subcalls;
1319                         while(osc && osc->subcallid != sc->subcallid)
1320                                 osc = osc->next;
1321                 } while (osc);
1322                 sc->next = client->subcalls;
1323                 client->subcalls = sc;
1324                 pthread_mutex_unlock(&client->mutex);
1325
1326                 if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
1327                  && api_ws_write_uint32(&wb, wreq->msgid)
1328                  && api_ws_write_uint32(&wb, sc->subcallid)
1329                  && api_ws_write_string(&wb, api)
1330                  && api_ws_write_string(&wb, verb)
1331                  && api_ws_write_object(&wb, args)) {
1332                         rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1333                         if (rc >= 0)
1334                                 return;
1335                 }
1336                 ERROR("error while sending fail");
1337         }
1338 }
1339
1340 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1341 {
1342         int rc, rc2;
1343         struct writebuf wb = { .count = 0 };
1344         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1345
1346         rc = afb_evt_add_watch(wreq->client->listener, event);
1347         if (rc < 0)
1348                 return rc;
1349
1350         if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
1351          && api_ws_write_uint32(&wb, wreq->msgid)
1352          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1353          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1354                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1355                 if (rc2 >= 0)
1356                         goto success;
1357         }
1358         ERROR("error while subscribing event");
1359 success:
1360         return rc;
1361 }
1362
1363 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1364 {
1365         int rc, rc2;
1366         struct writebuf wb = { .count = 0 };
1367         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1368
1369         if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
1370          && api_ws_write_uint32(&wb, wreq->msgid)
1371          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1372          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1373                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1374                 if (rc2 >= 0)
1375                         goto success;
1376         }
1377         ERROR("error while subscribing event");
1378 success:
1379         rc = afb_evt_remove_watch(wreq->client->listener, event);
1380         return rc;
1381 }
1382
1383 /******************* server part **********************************/
1384
1385 static int api_ws_server_connect(struct api_ws *api);
1386
1387 static int api_ws_server_listen_callback(sd_event_source *src, int fd, uint32_t revents, void *closure)
1388 {
1389         if ((revents & EPOLLIN) != 0)
1390                 api_ws_server_accept(closure);
1391         if ((revents & EPOLLHUP) != 0)
1392                 api_ws_server_connect(closure);
1393         return 0;
1394 }
1395
1396 static void api_ws_server_disconnect(struct api_ws *api)
1397 {
1398         if (api->server.listensrc != NULL) {
1399                 sd_event_source_unref(api->server.listensrc);
1400                 api->server.listensrc = NULL;
1401         }
1402         if (api->fd >= 0) {
1403                 close(api->fd);
1404                 api->fd = -1;
1405         }
1406 }
1407
1408 static int api_ws_server_connect(struct api_ws *api)
1409 {
1410         int rc;
1411
1412         /* ensure disconnected */
1413         api_ws_server_disconnect(api);
1414
1415         /* request the service object name */
1416         api->fd = api_ws_socket(api->path, 1);
1417         if (api->fd < 0)
1418                 ERROR("can't create socket %s", api->path);
1419         else {
1420                 /* listen for service */
1421                 rc = sd_event_add_io(afb_common_get_event_loop(), &api->server.listensrc, api->fd, EPOLLIN, api_ws_server_listen_callback, api);
1422                 if (rc >= 0)
1423                         return 0;
1424                 close(api->fd);
1425                 errno = -rc;
1426                 ERROR("can't add ws object %s", api->path);
1427         }
1428         return -1;
1429 }
1430
1431 /* create the service */
1432 int afb_api_ws_add_server(const char *path)
1433 {
1434         int rc;
1435         struct api_ws *api;
1436
1437         /* creates the ws api object */
1438         api = api_ws_make(path);
1439         if (api == NULL)
1440                 goto error;
1441
1442         /* connect for serving */
1443         rc = api_ws_server_connect(api);
1444         if (rc < 0)
1445                 goto error2;
1446
1447         return 0;
1448
1449 error2:
1450         free(api);
1451 error:
1452         return -1;
1453 }
1454
1455