allow abstract unix sockets
[src/app-framework-binder.git] / src / afb-api-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_PLUGIN_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <assert.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <endian.h>
28 #include <netdb.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32 #include <pthread.h>
33
34 #include <json-c/json.h>
35 #include <systemd/sd-event.h>
36
37 #include <afb/afb-req-itf.h>
38
39 #include "afb-common.h"
40
41 #include "afb-session.h"
42 #include "afb-cred.h"
43 #include "afb-ws.h"
44 #include "afb-msg-json.h"
45 #include "afb-api.h"
46 #include "afb-apiset.h"
47 #include "afb-api-so.h"
48 #include "afb-context.h"
49 #include "afb-evt.h"
50 #include "afb-xreq.h"
51 #include "verbose.h"
52 #include "sd-fds.h"
53
54 struct api_ws_memo;
55 struct api_ws_event;
56 struct api_ws_client;
57
58 #define CHAR_FOR_CALL             'C'
59 #define CHAR_FOR_ANSWER_SUCCESS   'T'
60 #define CHAR_FOR_ANSWER_FAIL      'F'
61 #define CHAR_FOR_EVT_BROADCAST    '*'
62 #define CHAR_FOR_EVT_ADD          '+'
63 #define CHAR_FOR_EVT_DEL          '-'
64 #define CHAR_FOR_EVT_PUSH         '!'
65 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
66 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
67 #define CHAR_FOR_SUBCALL_CALL     'B'
68 #define CHAR_FOR_SUBCALL_REPLY    'R'
69
70 /*
71  */
72 struct api_ws
73 {
74         char *path;             /* path of the object for the API */
75         char *api;              /* api name of the interface */
76         int fd;                 /* file descriptor */
77         pthread_mutex_t mutex;  /**< resource control */
78         union {
79                 struct {
80                         struct afb_ws *ws;
81                         struct api_ws_event *events;
82                         struct api_ws_memo *memos;
83                 } client;
84                 struct {
85                         sd_event_source *listensrc; /**< systemd source for server socket */
86                         struct afb_apiset *apiset;
87                 } server;
88         };
89 };
90
91 #define RETOK   1
92 #define RETERR  2
93 #define RETRAW  3
94
95 /******************* common usefull tools **********************************/
96
97 /**
98  * translate a pointer to some integer
99  * @param ptr the pointer to translate
100  * @return an integer
101  */
102 static inline uint32_t ptr2id(void *ptr)
103 {
104         return (uint32_t)(((intptr_t)ptr) >> 6);
105 }
106
107 /******************* websocket interface for client part **********************************/
108
109 static void api_ws_client_on_binary(void *closure, char *data, size_t size);
110
111 static const struct afb_ws_itf api_ws_client_ws_itf =
112 {
113         .on_close = NULL,
114         .on_text = NULL,
115         .on_binary = api_ws_client_on_binary,
116         .on_error = NULL,
117         .on_hangup = NULL
118 };
119
120 /******************* event structures for server part **********************************/
121
122 static void api_ws_server_event_add(void *closure, const char *event, int eventid);
123 static void api_ws_server_event_remove(void *closure, const char *event, int eventid);
124 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
125 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
126
127 /* the interface for events pushing */
128 static const struct afb_evt_itf api_ws_server_evt_itf = {
129         .broadcast = api_ws_server_event_broadcast,
130         .push = api_ws_server_event_push,
131         .add = api_ws_server_event_add,
132         .remove = api_ws_server_event_remove
133 };
134
135 /******************* handling subcalls *****************************/
136
137 /**
138  * Structure on server side for recording pending
139  * subcalls.
140  */
141 struct api_ws_subcall
142 {
143         struct api_ws_subcall *next;    /**< next subcall for the client */
144         uint32_t subcallid;             /**< the subcallid */
145         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
146         void *closure;                  /**< closure of the callback */
147 };
148
149 /**
150  * Structure for sending back replies on client side
151  */
152 struct api_ws_reply
153 {
154         struct api_ws *apiws;   /**< api descriptor */
155         uint32_t subcallid;     /**< subcallid for the reply */
156 };
157
158 /******************* client description part for server *****************************/
159
160 struct api_ws_client
161 {
162         /* the server ws-api */
163         const char *api;
164
165         /* count of references */
166         int refcount;
167
168         /* file descriptor */
169         int fd;
170
171         /* resource control */
172         pthread_mutex_t mutex;
173
174         /* listener for events */
175         struct afb_evt_listener *listener;
176
177         /* websocket */
178         struct afb_ws *ws;
179
180         /* credentials */
181         struct afb_cred *cred;
182
183         /* pending subcalls */
184         struct api_ws_subcall *subcalls;
185
186         /* apiset */
187         struct afb_apiset *apiset;
188 };
189
190 /******************* websocket interface for client part **********************************/
191
192 static void api_ws_server_on_binary(void *closure, char *data, size_t size);
193 static void api_ws_server_on_hangup(void *closure);
194
195 static const struct afb_ws_itf api_ws_server_ws_itf =
196 {
197         .on_close = NULL,
198         .on_text = NULL,
199         .on_binary = api_ws_server_on_binary,
200         .on_error = NULL,
201         .on_hangup = api_ws_server_on_hangup
202 };
203
204 /******************* ws request part for server *****************/
205
206 /*
207  * structure for a ws request
208  */
209 struct api_ws_server_req {
210         struct afb_xreq xreq;           /* the xreq */
211         struct api_ws_client *client;   /* the client of the request */
212         uint32_t msgid;                 /* the incoming request msgid */
213 };
214
215 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info);
216 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info);
217 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
218 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
219 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
220 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
221
222 const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
223         .success = api_ws_server_req_success_cb,
224         .fail = api_ws_server_req_fail_cb,
225         .unref = api_ws_server_req_destroy_cb,
226         .subcall = api_ws_server_req_subcall_cb,
227         .subscribe = api_ws_server_req_subscribe_cb,
228         .unsubscribe = api_ws_server_req_unsubscribe_cb
229 };
230
231 /******************* common part **********************************/
232
233 /*
234  * create a structure api_ws not connected to the 'path'.
235  */
236 static struct api_ws *api_ws_make(const char *path)
237 {
238         struct api_ws *api;
239         size_t length;
240
241         /* allocates the structure */
242         length = strlen(path);
243         api = calloc(1, sizeof *api + 1 + length);
244         if (api == NULL) {
245                 errno = ENOMEM;
246                 goto error;
247         }
248
249         /* path is copied after the struct */
250         api->path = (char*)(api+1);
251         memcpy(api->path, path, length + 1);
252
253         /* api name is at the end of the path */
254         while (length && path[length - 1] != '/' && path[length - 1] != ':')
255                 length = length - 1;
256         api->api = &api->path[length];
257         if (api->api == NULL || !afb_api_is_valid_name(api->api)) {
258                 errno = EINVAL;
259                 goto error2;
260         }
261         pthread_mutex_init(&api->mutex, NULL);
262
263         api->fd = -1;
264         return api;
265
266 error2:
267         free(api);
268 error:
269         return NULL;
270 }
271
272 static int api_ws_socket_unix(const char *path, int server)
273 {
274         int fd, rc;
275         struct sockaddr_un addr;
276         size_t length;
277
278         length = strlen(path);
279         if (length >= 108) {
280                 errno = ENAMETOOLONG;
281                 return -1;
282         }
283
284         if (server && path[0] != '@')
285                 unlink(path);
286
287         fd = socket(AF_UNIX, SOCK_STREAM, 0);
288         if (fd < 0)
289                 return fd;
290
291         memset(&addr, 0, sizeof addr);
292         addr.sun_family = AF_UNIX;
293         strcpy(addr.sun_path, path);
294         if (addr.sun_path[0] == '@')
295                 addr.sun_path[0] = 0; /* implement abstract sockets */
296         if (server) {
297                 rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
298         } else {
299                 rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
300         }
301         if (rc < 0) {
302                 close(fd);
303                 return rc;
304         }
305         return fd;
306 }
307
308 static int api_ws_socket_inet(const char *path, int server)
309 {
310         int rc, fd;
311         const char *service, *host, *api;
312         struct addrinfo hint, *rai, *iai;
313
314         /* scan the uri */
315         api = strrchr(path, '/');
316         service = strrchr(path, ':');
317         if (api == NULL || service == NULL || api < service) {
318                 errno = EINVAL;
319                 return -1;
320         }
321         host = strndupa(path, service++ - path);
322         service = strndupa(service, api - service);
323
324         /* get addr */
325         memset(&hint, 0, sizeof hint);
326         hint.ai_family = AF_INET;
327         hint.ai_socktype = SOCK_STREAM;
328         rc = getaddrinfo(host, service, &hint, &rai);
329         if (rc != 0) {
330                 errno = EINVAL;
331                 return -1;
332         }
333
334         /* get the socket */
335         iai = rai;
336         while (iai != NULL) {
337                 fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol);
338                 if (fd >= 0) {
339                         if (server) {
340                                 rc = bind(fd, iai->ai_addr, iai->ai_addrlen);
341                         } else {
342                                 rc = connect(fd, iai->ai_addr, iai->ai_addrlen);
343                         }
344                         if (rc == 0) {
345                                 freeaddrinfo(rai);
346                                 return fd;
347                         }
348                         close(fd);
349                 }
350                 iai = iai->ai_next;
351         }
352         freeaddrinfo(rai);
353         return -1;
354         
355 }
356
357 static int api_ws_socket(const char *path, int server)
358 {
359         int fd, rc;
360
361         /* check for systemd socket */
362         if (0 == strncmp(path, "sd:", 3))
363                 fd = sd_fds_for(path + 3);
364         else {
365                 /* check for unix socket */
366                 if (0 == strncmp(path, "unix:", 5))
367                         /* unix socket */
368                         fd = api_ws_socket_unix(path + 5, server);
369                 else
370                         /* inet socket */
371                         fd = api_ws_socket_inet(path, server);
372
373                 if (fd >= 0 && server) {
374                         rc = 1;
375                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc);
376                         rc = listen(fd, 5);
377                 }
378         }
379         /* configure the socket */
380         if (fd >= 0) {
381                 fcntl(fd, F_SETFD, FD_CLOEXEC);
382                 fcntl(fd, F_SETFL, O_NONBLOCK);
383         }
384         return fd;
385 }
386
387 /******************* serialisation part **********************************/
388
389 struct readbuf
390 {
391         char *head, *end;
392 };
393
394 #define WRITEBUF_COUNT_MAX  32
395 struct writebuf
396 {
397         struct iovec iovec[WRITEBUF_COUNT_MAX];
398         uint32_t uints[WRITEBUF_COUNT_MAX];
399         int count;
400 };
401
402 static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
403 {
404         char *before = rb->head;
405         char *after = before + length;
406         if (after > rb->end)
407                 return 0;
408         rb->head = after;
409         return before;
410 }
411
412 static int api_ws_read_char(struct readbuf *rb, char *value)
413 {
414         if (rb->head >= rb->end)
415                 return 0;
416         *value = *rb->head++;
417         return 1;
418 }
419
420 static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
421 {
422         char *after = rb->head + sizeof *value;
423         if (after > rb->end)
424                 return 0;
425         memcpy(value, rb->head, sizeof *value);
426         rb->head = after;
427         *value = le32toh(*value);
428         return 1;
429 }
430
431 static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
432 {
433         uint32_t len;
434         if (!api_ws_read_uint32(rb, &len) || !len)
435                 return 0;
436         if (length)
437                 *length = (size_t)(len - 1);
438         return (*value = api_ws_read_get(rb, len)) != NULL &&  rb->head[-1] == 0;
439 }
440
441 static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
442 {
443         const char *string;
444         struct json_object *o;
445         int rc = api_ws_read_string(rb, &string, NULL);
446         if (rc) {
447                 o = json_tokener_parse(string);
448                 if (o == NULL && strcmp(string, "null"))
449                         o = json_object_new_string(string);
450                 *object = o;
451         }
452         return rc;
453 }
454
455 static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
456 {
457         int i = wb->count;
458         if (i == WRITEBUF_COUNT_MAX)
459                 return 0;
460         wb->iovec[i].iov_base = (void*)value;
461         wb->iovec[i].iov_len = length;
462         wb->count = i + 1;
463         return 1;
464 }
465
466 static int api_ws_write_char(struct writebuf *wb, char value)
467 {
468         int i = wb->count;
469         if (i == WRITEBUF_COUNT_MAX)
470                 return 0;
471         *(char*)&wb->uints[i] = value;
472         wb->iovec[i].iov_base = &wb->uints[i];
473         wb->iovec[i].iov_len = 1;
474         wb->count = i + 1;
475         return 1;
476 }
477
478 static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
479 {
480         int i = wb->count;
481         if (i == WRITEBUF_COUNT_MAX)
482                 return 0;
483         wb->uints[i] = htole32(value);
484         wb->iovec[i].iov_base = &wb->uints[i];
485         wb->iovec[i].iov_len = sizeof wb->uints[i];
486         wb->count = i + 1;
487         return 1;
488 }
489
490 static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
491 {
492         uint32_t len = (uint32_t)++length;
493         return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
494 }
495
496 static int api_ws_write_string(struct writebuf *wb, const char *value)
497 {
498         return api_ws_write_string_length(wb, value, strlen(value));
499 }
500
501 static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
502 {
503         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
504         return string != NULL && api_ws_write_string(wb, string);
505 }
506
507 /******************* client part **********************************/
508
509 /*
510  * structure for recording query data
511  */
512 struct api_ws_memo {
513         struct api_ws_memo *next;       /* the next memo */
514         struct api_ws *api;             /* the ws api */
515         struct afb_xreq *xreq;          /* the request handle */
516         uint32_t msgid;                 /* the message identifier */
517 };
518
519 struct api_ws_event
520 {
521         struct api_ws_event *next;
522         struct afb_event event;
523         int eventid;
524         int refcount;
525 };
526
527 /* search a memorized request */
528 static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
529 {
530         struct api_ws_memo *memo;
531
532         memo = api->client.memos;
533         while (memo != NULL && memo->msgid != msgid)
534                 memo = memo->next;
535
536         return memo;
537 }
538
539 /* search the event */
540 static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
541 {
542         struct api_ws_event *ev;
543
544         ev = api->client.events;
545         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
546                 ev = ev->next;
547
548         return ev;
549 }
550
551
552 /* allocates and init the memorizing data */
553 static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
554 {
555         struct api_ws_memo *memo;
556
557         memo = malloc(sizeof *memo);
558         if (memo != NULL) {
559                 afb_xreq_addref(xreq);
560                 memo->xreq = xreq;
561                 memo->msgid = ptr2id(memo);
562                 while(api_ws_client_memo_search(api, memo->msgid) != NULL)
563                         memo->msgid++;
564                 memo->api = api;
565                 memo->next = api->client.memos;
566                 api->client.memos = memo;
567         }
568         return memo;
569 }
570
571 /* free and release the memorizing data */
572 static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
573 {
574         struct api_ws_memo **prv;
575
576         prv = &memo->api->client.memos;
577         while (*prv != NULL) {
578                 if (*prv == memo) {
579                         *prv = memo->next;
580                         break;
581                 }
582                 prv = &(*prv)->next;
583         }
584
585         afb_xreq_unref(memo->xreq);
586         free(memo);
587 }
588
589 /* get event data from the message */
590 static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
591 {
592         return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
593 }
594
595 /* get event from the message */
596 static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
597 {
598         const char *name;
599         uint32_t eventid;
600
601         /* get event data from the message */
602         if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
603                 ERROR("Invalid message");
604                 return 0;
605         }
606
607         /* check conflicts */
608         *ev = api_ws_client_event_search(api, eventid, name);
609         if (*ev == NULL) {
610                 ERROR("event %s not found", name);
611                 return 0;
612         }
613
614         return 1;
615 }
616
617 /* get event from the message */
618 static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
619 {
620         uint32_t msgid;
621
622         /* get event data from the message */
623         if (!api_ws_read_uint32(rb, &msgid)) {
624                 ERROR("Invalid message");
625                 return 0;
626         }
627
628         /* get the memo */
629         *memo = api_ws_client_memo_search(api, msgid);
630         if (*memo == NULL) {
631                 ERROR("message not found");
632                 return 0;
633         }
634
635         return 1;
636 }
637
638 /* read a subscrition message */
639 static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
640 {
641         return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
642 }
643
644 /* adds an event */
645 static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
646 {
647         size_t offset;
648         const char *name;
649         uint32_t eventid;
650         struct api_ws_event *ev;
651
652         /* get event data from the message */
653         offset = api_ws_client_msg_event_read(rb, &eventid, &name);
654         if (offset == 0) {
655                 ERROR("Invalid message");
656                 return;
657         }
658
659         /* check conflicts */
660         ev = api_ws_client_event_search(api, eventid, name);
661         if (ev != NULL) {
662                 ev->refcount++;
663                 return;
664         }
665
666         /* no conflict, try to add it */
667         ev = malloc(sizeof *ev);
668         if (ev != NULL) {
669                 ev->event = afb_evt_create_event(name);
670                 if (ev->event.closure == NULL)
671                         free(ev);
672                 else {
673                         ev->refcount = 1;
674                         ev->eventid = eventid;
675                         ev->next = api->client.events;
676                         api->client.events = ev;
677                         return;
678                 }
679         }
680         ERROR("can't create event %s, out of memory", name);
681 }
682
683 /* removes an event */
684 static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
685 {
686         struct api_ws_event *ev, **prv;
687
688         /* retrieves the event */
689         if (!api_ws_client_msg_event_get(api, rb, &ev))
690                 return;
691
692         /* decrease the reference count */
693         if (--ev->refcount)
694                 return;
695
696         /* unlinks the event */
697         prv = &api->client.events;
698         while (*prv != ev)
699                 prv = &(*prv)->next;
700         *prv = ev->next;
701
702         /* destroys the event */
703         afb_event_drop(ev->event);
704         free(ev);
705 }
706
707 /* subscribes an event */
708 static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
709 {
710         struct api_ws_event *ev;
711         struct api_ws_memo *memo;
712
713         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
714                 /* subscribe the request from the event */
715                 if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
716                         ERROR("can't subscribe: %m");
717         }
718 }
719
720 /* unsubscribes an event */
721 static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
722 {
723         struct api_ws_event *ev;
724         struct api_ws_memo *memo;
725
726         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
727                 /* unsubscribe the request from the event */
728                 if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
729                         ERROR("can't unsubscribe: %m");
730         }
731 }
732
733 /* receives broadcasted events */
734 static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
735 {
736         struct json_object *object;
737         const char *event;
738
739         if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
740                 afb_evt_broadcast(event, object);
741         else
742                 ERROR("unreadable broadcasted event");
743 }
744
745 /* pushs an event */
746 static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
747 {
748         struct api_ws_event *ev;
749         struct json_object *object;
750
751         if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
752                 afb_event_push(ev->event, object);
753         else
754                 ERROR("unreadable push event");
755 }
756
757 static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
758 {
759         struct api_ws_memo *memo;
760         struct json_object *object;
761         const char *info;
762         uint32_t flags;
763
764         /* retrieve the message data */
765         if (!api_ws_client_msg_memo_get(api, rb, &memo))
766                 return;
767
768         if (api_ws_read_uint32(rb, &flags)
769          && api_ws_read_string(rb, &info, NULL)
770          && api_ws_read_object(rb, &object)) {
771                 memo->xreq->context.flags = (unsigned)flags;
772                 afb_xreq_success(memo->xreq, object, *info ? info : NULL);
773         } else {
774                 /* failing to have the answer */
775                 afb_xreq_fail(memo->xreq, "error", "ws error");
776         }
777         api_ws_client_memo_destroy(memo);
778 }
779
780 static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
781 {
782         struct api_ws_memo *memo;
783         const char *info, *status;
784         uint32_t flags;
785
786         /* retrieve the message data */
787         if (!api_ws_client_msg_memo_get(api, rb, &memo))
788                 return;
789
790         if (api_ws_read_uint32(rb, &flags)
791          && api_ws_read_string(rb, &status, NULL)
792          && api_ws_read_string(rb, &info, NULL)) {
793                 memo->xreq->context.flags = (unsigned)flags;
794                 afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
795         } else {
796                 /* failing to have the answer */
797                 afb_xreq_fail(memo->xreq, "error", "ws error");
798         }
799         api_ws_client_memo_destroy(memo);
800 }
801
802 /* send a subcall reply */
803 static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
804 {
805         int rc;
806         struct writebuf wb = { .count = 0 };
807         char ie = (char)!!iserror;
808
809         if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
810          || !api_ws_write_uint32(&wb, reply->subcallid)
811          || !api_ws_write_char(&wb, ie)
812          || !api_ws_write_object(&wb, object)) {
813                 /* write error ? */
814                 return;
815         }
816
817         rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
818         if (rc >= 0)
819                 return;
820         ERROR("error while sending subcall reply");
821 }
822
823 /* callback for subcall reply */
824 static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
825 {
826         api_ws_client_send_subcall_reply(closure, iserror, object);
827         free(closure);
828 }
829
830 /* received a subcall request */
831 static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
832 {
833         struct api_ws_reply *reply;
834         struct api_ws_memo *memo;
835         const char *api, *verb;
836         uint32_t subcallid;
837         struct json_object *object;
838
839         reply = malloc(sizeof *reply);
840         if (!reply)
841                 return;
842
843         /* retrieve the message data */
844         if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
845                 return;
846
847         if (api_ws_read_uint32(rb, &subcallid)
848          && api_ws_read_string(rb, &api, NULL)
849          && api_ws_read_string(rb, &verb, NULL)
850          && api_ws_read_object(rb, &object)) {
851                 reply->apiws = apiws;
852                 reply->subcallid = subcallid;
853                 afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
854         }
855 }
856
857 /* callback when receiving binary data */
858 static void api_ws_client_on_binary(void *closure, char *data, size_t size)
859 {
860         if (size > 0) {
861                 struct api_ws *apiws = closure;
862                 struct readbuf rb = { .head = data, .end = data + size };
863
864                 pthread_mutex_lock(&apiws->mutex);
865                 switch (*rb.head++) {
866                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
867                         api_ws_client_reply_success(apiws, &rb);
868                         break;
869                 case CHAR_FOR_ANSWER_FAIL: /* fail */
870                         api_ws_client_reply_fail(apiws, &rb);
871                         break;
872                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
873                         api_ws_client_event_broadcast(apiws, &rb);
874                         break;
875                 case CHAR_FOR_EVT_ADD: /* creates the event */
876                         api_ws_client_event_create(apiws, &rb);
877                         break;
878                 case CHAR_FOR_EVT_DEL: /* drops the event */
879                         api_ws_client_event_drop(apiws, &rb);
880                         break;
881                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
882                         api_ws_client_event_push(apiws, &rb);
883                         break;
884                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
885                         api_ws_client_event_subscribe(apiws, &rb);
886                         break;
887                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
888                         api_ws_client_event_unsubscribe(apiws, &rb);
889                         break;
890                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
891                         api_ws_client_subcall(apiws, &rb);
892                         break;
893                 default: /* unexpected message */
894                         /* TODO: close the connection */
895                         break;
896                 }
897                 pthread_mutex_unlock(&apiws->mutex);
898         }
899         free(data);
900 }
901
902 /* on call, propagate it to the ws service */
903 static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
904 {
905         int rc;
906         struct api_ws_memo *memo;
907         struct writebuf wb = { .count = 0 };
908         const char *raw;
909         size_t szraw;
910         struct api_ws *apiws = closure;
911
912         pthread_mutex_lock(&apiws->mutex);
913
914         /* create the recording data */
915         memo = api_ws_client_memo_make(apiws, xreq);
916         if (memo == NULL) {
917                 afb_xreq_fail_f(xreq, "error", "out of memory");
918                 goto end;
919         }
920
921         /* creates the call message */
922         raw = afb_xreq_raw(xreq, &szraw);
923         if (raw == NULL)
924                 goto internal_error;
925         if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
926          || !api_ws_write_uint32(&wb, memo->msgid)
927          || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
928          || !api_ws_write_string(&wb, xreq->verb)
929          || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
930          || !api_ws_write_string_length(&wb, raw, szraw))
931                 goto overflow;
932
933         /* send */
934         rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
935         if (rc >= 0)
936                 goto end;
937
938         afb_xreq_fail(xreq, "error", "websocket sending error");
939         goto clean_memo;
940
941 internal_error:
942         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
943         goto clean_memo;
944
945 overflow:
946         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
947
948 clean_memo:
949         api_ws_client_memo_destroy(memo);
950 end:
951         pthread_mutex_unlock(&apiws->mutex);
952 }
953
954 /*  */
955 static void api_ws_client_disconnect(struct api_ws *api)
956 {
957         if (api->fd >= 0) {
958                 afb_ws_destroy(api->client.ws);
959                 api->client.ws = NULL;
960                 close(api->fd);
961                 api->fd = -1;
962         }
963 }
964
965 /*  */
966 static int api_ws_client_connect(struct api_ws *api)
967 {
968         struct afb_ws *ws;
969         int fd;
970
971         fd = api_ws_socket(api->path, 0);
972         if (fd >= 0) {
973                 ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
974                 if (ws != NULL) {
975                         api->client.ws = ws;
976                         api->fd = fd;
977                         return 0;
978                 }
979                 close(fd);
980         }
981         return -1;
982 }
983
984 static struct afb_api_itf ws_api_itf = {
985         .call = api_ws_client_call_cb
986 };
987
988 /* adds a afb-ws-service client api */
989 int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset)
990 {
991         int rc;
992         struct api_ws *api;
993         struct afb_api afb_api;
994
995         /* create the ws client api */
996         api = api_ws_make(path);
997         if (api == NULL)
998                 goto error;
999
1000         /* connect to the service */
1001         rc = api_ws_client_connect(api);
1002         if (rc < 0) {
1003                 ERROR("can't connect to ws service %s", api->path);
1004                 goto error2;
1005         }
1006
1007         /* record it as an API */
1008         afb_api.closure = api;
1009         afb_api.itf = &ws_api_itf;
1010         if (afb_apiset_add(apiset, api->api, afb_api) < 0)
1011                 goto error3;
1012
1013         return 0;
1014
1015 error3:
1016         api_ws_client_disconnect(api);
1017 error2:
1018         free(api);
1019 error:
1020         return -1;
1021 }
1022
1023 /******************* client description part for server *****************************/
1024
1025 static void api_ws_server_client_unref(struct api_ws_client *client)
1026 {
1027         struct api_ws_subcall *sc, *nsc;
1028
1029         if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
1030                 afb_evt_listener_unref(client->listener);
1031                 afb_ws_destroy(client->ws);
1032                 nsc = client->subcalls;
1033                 while (nsc) {
1034                         sc= nsc;
1035                         nsc = sc->next;
1036                         sc->callback(sc->closure, 1, NULL);
1037                         free(sc);
1038                 }
1039                 afb_cred_unref(client->cred);
1040                 afb_apiset_unref(client->apiset);
1041                 free(client);
1042         }
1043 }
1044
1045 static void api_ws_server_client_addref(struct api_ws_client *client)
1046 {
1047         __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
1048 }
1049
1050 /* on call, propagate it to the ws service */
1051 static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
1052 {
1053         struct api_ws_server_req *wreq;
1054         char *cverb;
1055         const char *uuid, *verb;
1056         uint32_t flags, msgid;
1057         size_t lenverb;
1058         struct json_object *object;
1059
1060         api_ws_server_client_addref(client);
1061
1062         /* reads the call message data */
1063         if (!api_ws_read_uint32(rb, &msgid)
1064          || !api_ws_read_uint32(rb, &flags)
1065          || !api_ws_read_string(rb, &verb, &lenverb)
1066          || !api_ws_read_string(rb, &uuid, NULL)
1067          || !api_ws_read_object(rb, &object))
1068                 goto overflow;
1069
1070         /* create the request */
1071         wreq = malloc(++lenverb + sizeof *wreq);
1072         if (wreq == NULL)
1073                 goto out_of_memory;
1074
1075         afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
1076         wreq->client = client;
1077         wreq->msgid = msgid;
1078         cverb = (char*)&wreq[1];
1079         memcpy(cverb, verb, lenverb);
1080
1081         /* init the context */
1082         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
1083                 goto unconnected;
1084         wreq->xreq.context.flags = flags;
1085
1086         /* makes the call */
1087         wreq->xreq.cred = afb_cred_addref(client->cred);
1088         wreq->xreq.api = client->api;
1089         wreq->xreq.verb = cverb;
1090         wreq->xreq.json = object;
1091         afb_xreq_process(&wreq->xreq, client->apiset);
1092         return;
1093
1094 unconnected:
1095         free(wreq);
1096 out_of_memory:
1097         json_object_put(object);
1098 overflow:
1099         api_ws_server_client_unref(client);
1100 }
1101
1102 /* on subcall reply */
1103 static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
1104 {
1105         char iserror;
1106         uint32_t subcallid;
1107         struct json_object *object;
1108         struct api_ws_subcall *sc, **psc;
1109
1110         /* reads the call message data */
1111         if (!api_ws_read_uint32(rb, &subcallid)
1112          || !api_ws_read_char(rb, &iserror)
1113          || !api_ws_read_object(rb, &object)) {
1114                 /* TODO bad protocol */
1115                 return;
1116         }
1117
1118         /* search the subcall and unlink it */
1119         pthread_mutex_lock(&client->mutex);
1120         psc = &client->subcalls;
1121         while ((sc = *psc) && sc->subcallid != subcallid)
1122                 psc = &sc->next;
1123         if (!sc) {
1124                 pthread_mutex_unlock(&client->mutex);
1125                 /* TODO subcall not found */
1126         } else {
1127                 *psc = sc->next;
1128                 pthread_mutex_unlock(&client->mutex);
1129                 sc->callback(sc->closure, (int)iserror, object);
1130                 free(sc);
1131         }
1132         json_object_put(object);
1133 }
1134
1135 /* callback when receiving binary data */
1136 static void api_ws_server_on_binary(void *closure, char *data, size_t size)
1137 {
1138         if (size > 0) {
1139                 struct readbuf rb = { .head = data, .end = data + size };
1140                 switch (*rb.head++) {
1141                 case CHAR_FOR_CALL:
1142                         api_ws_server_on_call(closure, &rb);
1143                         break;
1144                 case CHAR_FOR_SUBCALL_REPLY:
1145                         api_ws_server_on_subcall_reply(closure, &rb);
1146                         break;
1147                 default: /* unexpected message */
1148                         /* TODO: close the connection */
1149                         break;
1150                 }
1151         }
1152         free(data);
1153 }
1154
1155 /* callback when receiving a hangup */
1156 static void api_ws_server_on_hangup(void *closure)
1157 {
1158         struct api_ws_client *client = closure;
1159
1160         /* close the socket */
1161         if (client->fd >= 0) {
1162                 close(client->fd);
1163                 client->fd = -1;
1164         }
1165
1166         /* release the client */
1167         api_ws_server_client_unref(client);
1168 }
1169
1170 static void api_ws_server_accept(struct api_ws *api)
1171 {
1172         struct api_ws_client *client;
1173         struct sockaddr addr;
1174         socklen_t lenaddr;
1175
1176         client = calloc(1, sizeof *client);
1177         if (client != NULL) {
1178                 client->listener = afb_evt_listener_create(&api_ws_server_evt_itf, client);
1179                 if (client->listener != NULL) {
1180                         lenaddr = (socklen_t)sizeof addr;
1181                         client->fd = accept(api->fd, &addr, &lenaddr);
1182                         if (client->fd >= 0) {
1183                                 client->cred = afb_cred_create_for_socket(client->fd);
1184                                 fcntl(client->fd, F_SETFD, FD_CLOEXEC);
1185                                 fcntl(client->fd, F_SETFL, O_NONBLOCK);
1186                                 client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
1187                                 if (client->ws != NULL) {
1188                                         client->api = api->api;
1189                                         client->apiset = afb_apiset_addref(api->server.apiset);
1190                                         client->refcount = 1;
1191                                         client->subcalls = NULL;
1192                                         return;
1193                                 }
1194                                 afb_cred_unref(client->cred);
1195                                 close(client->fd);
1196                         }
1197                         afb_evt_listener_unref(client->listener);
1198                 }
1199                 free(client);
1200         }
1201 }
1202
1203 /******************* server part: manage events **********************************/
1204
1205 static void api_ws_server_event_send(struct api_ws_client *client, char order, const char *event, int eventid, const char *data)
1206 {
1207         int rc;
1208         struct writebuf wb = { .count = 0 };
1209
1210         if (api_ws_write_char(&wb, order)
1211          && api_ws_write_uint32(&wb, eventid)
1212          && api_ws_write_string(&wb, event)
1213          && (data == NULL || api_ws_write_string(&wb, data))) {
1214                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1215                 if (rc >= 0)
1216                         return;
1217         }
1218         ERROR("error while sending %c for event %s", order, event);
1219 }
1220
1221 static void api_ws_server_event_add(void *closure, const char *event, int eventid)
1222 {
1223         api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
1224 }
1225
1226 static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
1227 {
1228         api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1229 }
1230
1231 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1232 {
1233         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1234         api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1235         json_object_put(object);
1236 }
1237
1238 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1239 {
1240         int rc;
1241         struct api_ws_client *client = closure;
1242
1243         struct writebuf wb = { .count = 0 };
1244
1245         if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
1246                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1247                 if (rc < 0)
1248                         ERROR("error while broadcasting event %s", event);
1249         } else
1250                 ERROR("error while broadcasting event %s", event);
1251         json_object_put(object);
1252 }
1253
1254 /******************* ws request part for server *****************/
1255
1256 /* decrement the reference count of the request and free/release it on falling to null */
1257 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq)
1258 {
1259         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1260
1261         afb_context_disconnect(&wreq->xreq.context);
1262         afb_cred_unref(wreq->xreq.cred);
1263         json_object_put(wreq->xreq.json);
1264         api_ws_server_client_unref(wreq->client);
1265         free(wreq);
1266 }
1267
1268 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
1269 {
1270         int rc;
1271         struct writebuf wb = { .count = 0 };
1272         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1273
1274         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
1275          && api_ws_write_uint32(&wb, wreq->msgid)
1276          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1277          && api_ws_write_string(&wb, info ? : "")
1278          && api_ws_write_object(&wb, obj)) {
1279                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1280                 if (rc >= 0)
1281                         goto success;
1282         }
1283         ERROR("error while sending success");
1284 success:
1285         json_object_put(obj);
1286 }
1287
1288 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
1289 {
1290         int rc;
1291         struct writebuf wb = { .count = 0 };
1292         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1293
1294         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL)
1295          && api_ws_write_uint32(&wb, wreq->msgid)
1296          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1297          && api_ws_write_string(&wb, status)
1298          && api_ws_write_string(&wb, info ? : "")) {
1299                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1300                 if (rc >= 0)
1301                         return;
1302         }
1303         ERROR("error while sending fail");
1304 }
1305
1306 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
1307 {
1308         int rc;
1309         struct writebuf wb = { .count = 0 };
1310         struct api_ws_subcall *sc, *osc;
1311         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1312         struct api_ws_client *client = wreq->client;
1313
1314         sc = malloc(sizeof *sc);
1315         if (!sc) {
1316
1317         } else {
1318                 sc->callback = callback;
1319                 sc->closure = cb_closure;
1320
1321                 pthread_mutex_unlock(&client->mutex);
1322                 sc->subcallid = ptr2id(sc);
1323                 do {
1324                         sc->subcallid++;
1325                         osc = client->subcalls;
1326                         while(osc && osc->subcallid != sc->subcallid)
1327                                 osc = osc->next;
1328                 } while (osc);
1329                 sc->next = client->subcalls;
1330                 client->subcalls = sc;
1331                 pthread_mutex_unlock(&client->mutex);
1332
1333                 if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
1334                  && api_ws_write_uint32(&wb, wreq->msgid)
1335                  && api_ws_write_uint32(&wb, sc->subcallid)
1336                  && api_ws_write_string(&wb, api)
1337                  && api_ws_write_string(&wb, verb)
1338                  && api_ws_write_object(&wb, args)) {
1339                         rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1340                         if (rc >= 0)
1341                                 return;
1342                 }
1343                 ERROR("error while sending fail");
1344         }
1345 }
1346
1347 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1348 {
1349         int rc, rc2;
1350         struct writebuf wb = { .count = 0 };
1351         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1352
1353         rc = afb_evt_add_watch(wreq->client->listener, event);
1354         if (rc < 0)
1355                 return rc;
1356
1357         if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
1358          && api_ws_write_uint32(&wb, wreq->msgid)
1359          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1360          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1361                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1362                 if (rc2 >= 0)
1363                         goto success;
1364         }
1365         ERROR("error while subscribing event");
1366 success:
1367         return rc;
1368 }
1369
1370 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1371 {
1372         int rc, rc2;
1373         struct writebuf wb = { .count = 0 };
1374         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1375
1376         if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
1377          && api_ws_write_uint32(&wb, wreq->msgid)
1378          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1379          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1380                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1381                 if (rc2 >= 0)
1382                         goto success;
1383         }
1384         ERROR("error while subscribing event");
1385 success:
1386         rc = afb_evt_remove_watch(wreq->client->listener, event);
1387         return rc;
1388 }
1389
1390 /******************* server part **********************************/
1391
1392 static int api_ws_server_connect(struct api_ws *api);
1393
1394 static int api_ws_server_listen_callback(sd_event_source *src, int fd, uint32_t revents, void *closure)
1395 {
1396         if ((revents & EPOLLIN) != 0)
1397                 api_ws_server_accept(closure);
1398         if ((revents & EPOLLHUP) != 0)
1399                 api_ws_server_connect(closure);
1400         return 0;
1401 }
1402
1403 static void api_ws_server_disconnect(struct api_ws *api)
1404 {
1405         if (api->server.listensrc != NULL) {
1406                 sd_event_source_unref(api->server.listensrc);
1407                 api->server.listensrc = NULL;
1408         }
1409         if (api->fd >= 0) {
1410                 close(api->fd);
1411                 api->fd = -1;
1412         }
1413 }
1414
1415 static int api_ws_server_connect(struct api_ws *api)
1416 {
1417         int rc;
1418
1419         /* ensure disconnected */
1420         api_ws_server_disconnect(api);
1421
1422         /* request the service object name */
1423         api->fd = api_ws_socket(api->path, 1);
1424         if (api->fd < 0)
1425                 ERROR("can't create socket %s", api->path);
1426         else {
1427                 /* listen for service */
1428                 rc = sd_event_add_io(afb_common_get_event_loop(), &api->server.listensrc, api->fd, EPOLLIN, api_ws_server_listen_callback, api);
1429                 if (rc >= 0)
1430                         return 0;
1431                 close(api->fd);
1432                 errno = -rc;
1433                 ERROR("can't add ws object %s", api->path);
1434         }
1435         return -1;
1436 }
1437
1438 /* create the service */
1439 int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset)
1440 {
1441         int rc;
1442         struct api_ws *api;
1443
1444         /* creates the ws api object */
1445         api = api_ws_make(path);
1446         if (api == NULL)
1447                 goto error;
1448
1449         /* connect for serving */
1450         rc = api_ws_server_connect(api);
1451         if (rc < 0)
1452                 goto error2;
1453
1454         api->server.apiset = afb_apiset_addref(apiset);
1455         return 0;
1456
1457 error2:
1458         free(api);
1459 error:
1460         return -1;
1461 }
1462
1463