use heuristic message ids
[src/app-framework-binder.git] / src / afb-api-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_PLUGIN_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <assert.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <endian.h>
28 #include <netdb.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32 #include <pthread.h>
33
34 #include <json-c/json.h>
35 #include <systemd/sd-event.h>
36
37 #include <afb/afb-req-itf.h>
38
39 #include "afb-common.h"
40
41 #include "afb-session.h"
42 #include "afb-cred.h"
43 #include "afb-ws.h"
44 #include "afb-msg-json.h"
45 #include "afb-api.h"
46 #include "afb-apiset.h"
47 #include "afb-api-so.h"
48 #include "afb-context.h"
49 #include "afb-evt.h"
50 #include "afb-xreq.h"
51 #include "verbose.h"
52 #include "sd-fds.h"
53
54 struct api_ws_memo;
55 struct api_ws_event;
56 struct api_ws_client;
57
58 #define CHAR_FOR_CALL             'C'
59 #define CHAR_FOR_ANSWER_SUCCESS   'T'
60 #define CHAR_FOR_ANSWER_FAIL      'F'
61 #define CHAR_FOR_EVT_BROADCAST    '*'
62 #define CHAR_FOR_EVT_ADD          '+'
63 #define CHAR_FOR_EVT_DEL          '-'
64 #define CHAR_FOR_EVT_PUSH         '!'
65 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
66 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
67 #define CHAR_FOR_SUBCALL_CALL     'B'
68 #define CHAR_FOR_SUBCALL_REPLY    'R'
69
70 /*
71  */
72 struct api_ws
73 {
74         char *path;             /* path of the object for the API */
75         char *api;              /* api name of the interface */
76         int fd;                 /* file descriptor */
77         pthread_mutex_t mutex;  /**< resource control */
78         union {
79                 struct {
80                         struct afb_ws *ws;
81                         struct api_ws_event *events;
82                         struct api_ws_memo *memos;
83                 } client;
84                 struct {
85                         sd_event_source *listensrc; /**< systemd source for server socket */
86                         struct afb_apiset *apiset;
87                 } server;
88         };
89 };
90
91 #define RETOK   1
92 #define RETERR  2
93 #define RETRAW  3
94
95 /******************* common usefull tools **********************************/
96
97 /**
98  * translate a pointer to some integer
99  * @param ptr the pointer to translate
100  * @return an integer
101  */
102 static inline uint32_t ptr2id(void *ptr)
103 {
104         return (uint32_t)(((intptr_t)ptr) >> 6);
105 }
106
107 /******************* websocket interface for client part **********************************/
108
109 static void api_ws_client_on_binary(void *closure, char *data, size_t size);
110
111 static const struct afb_ws_itf api_ws_client_ws_itf =
112 {
113         .on_close = NULL,
114         .on_text = NULL,
115         .on_binary = api_ws_client_on_binary,
116         .on_error = NULL,
117         .on_hangup = NULL
118 };
119
120 /******************* event structures for server part **********************************/
121
122 static void api_ws_server_event_add(void *closure, const char *event, int eventid);
123 static void api_ws_server_event_remove(void *closure, const char *event, int eventid);
124 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
125 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
126
127 /* the interface for events pushing */
128 static const struct afb_evt_itf api_ws_server_evt_itf = {
129         .broadcast = api_ws_server_event_broadcast,
130         .push = api_ws_server_event_push,
131         .add = api_ws_server_event_add,
132         .remove = api_ws_server_event_remove
133 };
134
135 /******************* handling subcalls *****************************/
136
137 /**
138  * Structure on server side for recording pending
139  * subcalls.
140  */
141 struct api_ws_subcall
142 {
143         struct api_ws_subcall *next;    /**< next subcall for the client */
144         uint32_t subcallid;             /**< the subcallid */
145         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
146         void *closure;                  /**< closure of the callback */
147 };
148
149 /**
150  * Structure for sending back replies on client side
151  */
152 struct api_ws_reply
153 {
154         struct api_ws *apiws;   /**< api descriptor */
155         uint32_t subcallid;     /**< subcallid for the reply */
156 };
157
158 /******************* client description part for server *****************************/
159
160 struct api_ws_client
161 {
162         /* the server ws-api */
163         const char *api;
164
165         /* count of references */
166         int refcount;
167
168         /* file descriptor */
169         int fd;
170
171         /* resource control */
172         pthread_mutex_t mutex;
173
174         /* listener for events */
175         struct afb_evt_listener *listener;
176
177         /* websocket */
178         struct afb_ws *ws;
179
180         /* credentials */
181         struct afb_cred *cred;
182
183         /* pending subcalls */
184         struct api_ws_subcall *subcalls;
185
186         /* apiset */
187         struct afb_apiset *apiset;
188 };
189
190 /******************* websocket interface for client part **********************************/
191
192 static void api_ws_server_on_binary(void *closure, char *data, size_t size);
193 static void api_ws_server_on_hangup(void *closure);
194
195 static const struct afb_ws_itf api_ws_server_ws_itf =
196 {
197         .on_close = NULL,
198         .on_text = NULL,
199         .on_binary = api_ws_server_on_binary,
200         .on_error = NULL,
201         .on_hangup = api_ws_server_on_hangup
202 };
203
204 /******************* ws request part for server *****************/
205
206 /*
207  * structure for a ws request
208  */
209 struct api_ws_server_req {
210         struct afb_xreq xreq;           /* the xreq */
211         struct api_ws_client *client;   /* the client of the request */
212         uint32_t msgid;                 /* the incoming request msgid */
213 };
214
215 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info);
216 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info);
217 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
218 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
219 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
220 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
221
222 const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
223         .success = api_ws_server_req_success_cb,
224         .fail = api_ws_server_req_fail_cb,
225         .unref = api_ws_server_req_destroy_cb,
226         .subcall = api_ws_server_req_subcall_cb,
227         .subscribe = api_ws_server_req_subscribe_cb,
228         .unsubscribe = api_ws_server_req_unsubscribe_cb
229 };
230
231 /******************* common part **********************************/
232
233 /*
234  * create a structure api_ws not connected to the 'path'.
235  */
236 static struct api_ws *api_ws_make(const char *path)
237 {
238         struct api_ws *api;
239         size_t length;
240
241         /* allocates the structure */
242         length = strlen(path);
243         api = calloc(1, sizeof *api + 1 + length);
244         if (api == NULL) {
245                 errno = ENOMEM;
246                 goto error;
247         }
248
249         /* path is copied after the struct */
250         api->path = (char*)(api+1);
251         memcpy(api->path, path, length + 1);
252
253         /* api name is at the end of the path */
254         while (length && path[length - 1] != '/' && path[length - 1] != ':')
255                 length = length - 1;
256         api->api = &api->path[length];
257         if (api->api == NULL || !afb_api_is_valid_name(api->api)) {
258                 errno = EINVAL;
259                 goto error2;
260         }
261         pthread_mutex_init(&api->mutex, NULL);
262
263         api->fd = -1;
264         return api;
265
266 error2:
267         free(api);
268 error:
269         return NULL;
270 }
271
272 static int api_ws_socket_unix(const char *path, int server)
273 {
274         int fd, rc;
275         struct sockaddr_un addr;
276         size_t length;
277
278         length = strlen(path);
279         if (length >= 108) {
280                 errno = ENAMETOOLONG;
281                 return -1;
282         }
283
284         if (server)
285                 unlink(path);
286
287         fd = socket(AF_UNIX, SOCK_STREAM, 0);
288         if (fd < 0)
289                 return fd;
290
291         memset(&addr, 0, sizeof addr);
292         addr.sun_family = AF_UNIX;
293         strcpy(addr.sun_path, path);
294         if (server) {
295                 rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
296         } else {
297                 rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
298         }
299         if (rc < 0) {
300                 close(fd);
301                 return rc;
302         }
303         return fd;
304 }
305
306 static int api_ws_socket_inet(const char *path, int server)
307 {
308         int rc, fd;
309         const char *service, *host, *api;
310         struct addrinfo hint, *rai, *iai;
311
312         /* scan the uri */
313         api = strrchr(path, '/');
314         service = strrchr(path, ':');
315         if (api == NULL || service == NULL || api < service) {
316                 errno = EINVAL;
317                 return -1;
318         }
319         host = strndupa(path, service++ - path);
320         service = strndupa(service, api - service);
321
322         /* get addr */
323         memset(&hint, 0, sizeof hint);
324         hint.ai_family = AF_INET;
325         hint.ai_socktype = SOCK_STREAM;
326         rc = getaddrinfo(host, service, &hint, &rai);
327         if (rc != 0) {
328                 errno = EINVAL;
329                 return -1;
330         }
331
332         /* get the socket */
333         iai = rai;
334         while (iai != NULL) {
335                 fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol);
336                 if (fd >= 0) {
337                         if (server) {
338                                 rc = bind(fd, iai->ai_addr, iai->ai_addrlen);
339                         } else {
340                                 rc = connect(fd, iai->ai_addr, iai->ai_addrlen);
341                         }
342                         if (rc == 0) {
343                                 freeaddrinfo(rai);
344                                 return fd;
345                         }
346                         close(fd);
347                 }
348                 iai = iai->ai_next;
349         }
350         freeaddrinfo(rai);
351         return -1;
352         
353 }
354
355 static int api_ws_socket(const char *path, int server)
356 {
357         int fd, rc;
358
359         /* check for systemd socket */
360         if (0 == strncmp(path, "sd:", 3))
361                 fd = sd_fds_for(path + 3);
362         else {
363                 /* check for unix socket */
364                 if (0 == strncmp(path, "unix:", 5))
365                         /* unix socket */
366                         fd = api_ws_socket_unix(path + 5, server);
367                 else
368                         /* inet socket */
369                         fd = api_ws_socket_inet(path, server);
370
371                 if (fd >= 0 && server) {
372                         rc = 1;
373                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc);
374                         rc = listen(fd, 5);
375                 }
376         }
377         /* configure the socket */
378         if (fd >= 0) {
379                 fcntl(fd, F_SETFD, FD_CLOEXEC);
380                 fcntl(fd, F_SETFL, O_NONBLOCK);
381         }
382         return fd;
383 }
384
385 /******************* serialisation part **********************************/
386
387 struct readbuf
388 {
389         char *head, *end;
390 };
391
392 #define WRITEBUF_COUNT_MAX  32
393 struct writebuf
394 {
395         struct iovec iovec[WRITEBUF_COUNT_MAX];
396         uint32_t uints[WRITEBUF_COUNT_MAX];
397         int count;
398 };
399
400 static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
401 {
402         char *before = rb->head;
403         char *after = before + length;
404         if (after > rb->end)
405                 return 0;
406         rb->head = after;
407         return before;
408 }
409
410 static int api_ws_read_char(struct readbuf *rb, char *value)
411 {
412         if (rb->head >= rb->end)
413                 return 0;
414         *value = *rb->head++;
415         return 1;
416 }
417
418 static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
419 {
420         char *after = rb->head + sizeof *value;
421         if (after > rb->end)
422                 return 0;
423         memcpy(value, rb->head, sizeof *value);
424         rb->head = after;
425         *value = le32toh(*value);
426         return 1;
427 }
428
429 static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
430 {
431         uint32_t len;
432         if (!api_ws_read_uint32(rb, &len) || !len)
433                 return 0;
434         if (length)
435                 *length = (size_t)(len - 1);
436         return (*value = api_ws_read_get(rb, len)) != NULL &&  rb->head[-1] == 0;
437 }
438
439 static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
440 {
441         const char *string;
442         struct json_object *o;
443         int rc = api_ws_read_string(rb, &string, NULL);
444         if (rc) {
445                 o = json_tokener_parse(string);
446                 if (o == NULL && strcmp(string, "null"))
447                         o = json_object_new_string(string);
448                 *object = o;
449         }
450         return rc;
451 }
452
453 static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
454 {
455         int i = wb->count;
456         if (i == WRITEBUF_COUNT_MAX)
457                 return 0;
458         wb->iovec[i].iov_base = (void*)value;
459         wb->iovec[i].iov_len = length;
460         wb->count = i + 1;
461         return 1;
462 }
463
464 static int api_ws_write_char(struct writebuf *wb, char value)
465 {
466         int i = wb->count;
467         if (i == WRITEBUF_COUNT_MAX)
468                 return 0;
469         *(char*)&wb->uints[i] = value;
470         wb->iovec[i].iov_base = &wb->uints[i];
471         wb->iovec[i].iov_len = 1;
472         wb->count = i + 1;
473         return 1;
474 }
475
476 static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
477 {
478         int i = wb->count;
479         if (i == WRITEBUF_COUNT_MAX)
480                 return 0;
481         wb->uints[i] = htole32(value);
482         wb->iovec[i].iov_base = &wb->uints[i];
483         wb->iovec[i].iov_len = sizeof wb->uints[i];
484         wb->count = i + 1;
485         return 1;
486 }
487
488 static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
489 {
490         uint32_t len = (uint32_t)++length;
491         return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
492 }
493
494 static int api_ws_write_string(struct writebuf *wb, const char *value)
495 {
496         return api_ws_write_string_length(wb, value, strlen(value));
497 }
498
499 static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
500 {
501         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
502         return string != NULL && api_ws_write_string(wb, string);
503 }
504
505 /******************* client part **********************************/
506
507 /*
508  * structure for recording query data
509  */
510 struct api_ws_memo {
511         struct api_ws_memo *next;       /* the next memo */
512         struct api_ws *api;             /* the ws api */
513         struct afb_xreq *xreq;          /* the request handle */
514         uint32_t msgid;                 /* the message identifier */
515 };
516
517 struct api_ws_event
518 {
519         struct api_ws_event *next;
520         struct afb_event event;
521         int eventid;
522         int refcount;
523 };
524
525 /* search a memorized request */
526 static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
527 {
528         struct api_ws_memo *memo;
529
530         memo = api->client.memos;
531         while (memo != NULL && memo->msgid != msgid)
532                 memo = memo->next;
533
534         return memo;
535 }
536
537 /* search the event */
538 static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
539 {
540         struct api_ws_event *ev;
541
542         ev = api->client.events;
543         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
544                 ev = ev->next;
545
546         return ev;
547 }
548
549
550 /* allocates and init the memorizing data */
551 static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
552 {
553         struct api_ws_memo *memo;
554
555         memo = malloc(sizeof *memo);
556         if (memo != NULL) {
557                 afb_xreq_addref(xreq);
558                 memo->xreq = xreq;
559                 memo->msgid = ptr2id(memo);
560                 while(api_ws_client_memo_search(api, memo->msgid) != NULL)
561                         memo->msgid++;
562                 memo->api = api;
563                 memo->next = api->client.memos;
564                 api->client.memos = memo;
565         }
566         return memo;
567 }
568
569 /* free and release the memorizing data */
570 static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
571 {
572         struct api_ws_memo **prv;
573
574         prv = &memo->api->client.memos;
575         while (*prv != NULL) {
576                 if (*prv == memo) {
577                         *prv = memo->next;
578                         break;
579                 }
580                 prv = &(*prv)->next;
581         }
582
583         afb_xreq_unref(memo->xreq);
584         free(memo);
585 }
586
587 /* get event data from the message */
588 static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
589 {
590         return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
591 }
592
593 /* get event from the message */
594 static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
595 {
596         const char *name;
597         uint32_t eventid;
598
599         /* get event data from the message */
600         if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
601                 ERROR("Invalid message");
602                 return 0;
603         }
604
605         /* check conflicts */
606         *ev = api_ws_client_event_search(api, eventid, name);
607         if (*ev == NULL) {
608                 ERROR("event %s not found", name);
609                 return 0;
610         }
611
612         return 1;
613 }
614
615 /* get event from the message */
616 static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
617 {
618         uint32_t msgid;
619
620         /* get event data from the message */
621         if (!api_ws_read_uint32(rb, &msgid)) {
622                 ERROR("Invalid message");
623                 return 0;
624         }
625
626         /* get the memo */
627         *memo = api_ws_client_memo_search(api, msgid);
628         if (*memo == NULL) {
629                 ERROR("message not found");
630                 return 0;
631         }
632
633         return 1;
634 }
635
636 /* read a subscrition message */
637 static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
638 {
639         return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
640 }
641
642 /* adds an event */
643 static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
644 {
645         size_t offset;
646         const char *name;
647         uint32_t eventid;
648         struct api_ws_event *ev;
649
650         /* get event data from the message */
651         offset = api_ws_client_msg_event_read(rb, &eventid, &name);
652         if (offset == 0) {
653                 ERROR("Invalid message");
654                 return;
655         }
656
657         /* check conflicts */
658         ev = api_ws_client_event_search(api, eventid, name);
659         if (ev != NULL) {
660                 ev->refcount++;
661                 return;
662         }
663
664         /* no conflict, try to add it */
665         ev = malloc(sizeof *ev);
666         if (ev != NULL) {
667                 ev->event = afb_evt_create_event(name);
668                 if (ev->event.closure == NULL)
669                         free(ev);
670                 else {
671                         ev->refcount = 1;
672                         ev->eventid = eventid;
673                         ev->next = api->client.events;
674                         api->client.events = ev;
675                         return;
676                 }
677         }
678         ERROR("can't create event %s, out of memory", name);
679 }
680
681 /* removes an event */
682 static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
683 {
684         struct api_ws_event *ev, **prv;
685
686         /* retrieves the event */
687         if (!api_ws_client_msg_event_get(api, rb, &ev))
688                 return;
689
690         /* decrease the reference count */
691         if (--ev->refcount)
692                 return;
693
694         /* unlinks the event */
695         prv = &api->client.events;
696         while (*prv != ev)
697                 prv = &(*prv)->next;
698         *prv = ev->next;
699
700         /* destroys the event */
701         afb_event_drop(ev->event);
702         free(ev);
703 }
704
705 /* subscribes an event */
706 static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
707 {
708         struct api_ws_event *ev;
709         struct api_ws_memo *memo;
710
711         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
712                 /* subscribe the request from the event */
713                 if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
714                         ERROR("can't subscribe: %m");
715         }
716 }
717
718 /* unsubscribes an event */
719 static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
720 {
721         struct api_ws_event *ev;
722         struct api_ws_memo *memo;
723
724         if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
725                 /* unsubscribe the request from the event */
726                 if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
727                         ERROR("can't unsubscribe: %m");
728         }
729 }
730
731 /* receives broadcasted events */
732 static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
733 {
734         struct json_object *object;
735         const char *event;
736
737         if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
738                 afb_evt_broadcast(event, object);
739         else
740                 ERROR("unreadable broadcasted event");
741 }
742
743 /* pushs an event */
744 static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
745 {
746         struct api_ws_event *ev;
747         struct json_object *object;
748
749         if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
750                 afb_event_push(ev->event, object);
751         else
752                 ERROR("unreadable push event");
753 }
754
755 static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
756 {
757         struct api_ws_memo *memo;
758         struct json_object *object;
759         const char *info;
760         uint32_t flags;
761
762         /* retrieve the message data */
763         if (!api_ws_client_msg_memo_get(api, rb, &memo))
764                 return;
765
766         if (api_ws_read_uint32(rb, &flags)
767          && api_ws_read_string(rb, &info, NULL)
768          && api_ws_read_object(rb, &object)) {
769                 memo->xreq->context.flags = (unsigned)flags;
770                 afb_xreq_success(memo->xreq, object, *info ? info : NULL);
771         } else {
772                 /* failing to have the answer */
773                 afb_xreq_fail(memo->xreq, "error", "ws error");
774         }
775         api_ws_client_memo_destroy(memo);
776 }
777
778 static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
779 {
780         struct api_ws_memo *memo;
781         const char *info, *status;
782         uint32_t flags;
783
784         /* retrieve the message data */
785         if (!api_ws_client_msg_memo_get(api, rb, &memo))
786                 return;
787
788         if (api_ws_read_uint32(rb, &flags)
789          && api_ws_read_string(rb, &status, NULL)
790          && api_ws_read_string(rb, &info, NULL)) {
791                 memo->xreq->context.flags = (unsigned)flags;
792                 afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
793         } else {
794                 /* failing to have the answer */
795                 afb_xreq_fail(memo->xreq, "error", "ws error");
796         }
797         api_ws_client_memo_destroy(memo);
798 }
799
800 /* send a subcall reply */
801 static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
802 {
803         int rc;
804         struct writebuf wb = { .count = 0 };
805         char ie = (char)!!iserror;
806
807         if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
808          || !api_ws_write_uint32(&wb, reply->subcallid)
809          || !api_ws_write_char(&wb, ie)
810          || !api_ws_write_object(&wb, object)) {
811                 /* write error ? */
812                 return;
813         }
814
815         rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
816         if (rc >= 0)
817                 return;
818         ERROR("error while sending subcall reply");
819 }
820
821 /* callback for subcall reply */
822 static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
823 {
824         api_ws_client_send_subcall_reply(closure, iserror, object);
825         free(closure);
826 }
827
828 /* received a subcall request */
829 static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
830 {
831         struct api_ws_reply *reply;
832         struct api_ws_memo *memo;
833         const char *api, *verb;
834         uint32_t subcallid;
835         struct json_object *object;
836
837         reply = malloc(sizeof *reply);
838         if (!reply)
839                 return;
840
841         /* retrieve the message data */
842         if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
843                 return;
844
845         if (api_ws_read_uint32(rb, &subcallid)
846          && api_ws_read_string(rb, &api, NULL)
847          && api_ws_read_string(rb, &verb, NULL)
848          && api_ws_read_object(rb, &object)) {
849                 reply->apiws = apiws;
850                 reply->subcallid = subcallid;
851                 afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
852         }
853 }
854
855 /* callback when receiving binary data */
856 static void api_ws_client_on_binary(void *closure, char *data, size_t size)
857 {
858         if (size > 0) {
859                 struct api_ws *apiws = closure;
860                 struct readbuf rb = { .head = data, .end = data + size };
861
862                 pthread_mutex_lock(&apiws->mutex);
863                 switch (*rb.head++) {
864                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
865                         api_ws_client_reply_success(apiws, &rb);
866                         break;
867                 case CHAR_FOR_ANSWER_FAIL: /* fail */
868                         api_ws_client_reply_fail(apiws, &rb);
869                         break;
870                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
871                         api_ws_client_event_broadcast(apiws, &rb);
872                         break;
873                 case CHAR_FOR_EVT_ADD: /* creates the event */
874                         api_ws_client_event_create(apiws, &rb);
875                         break;
876                 case CHAR_FOR_EVT_DEL: /* drops the event */
877                         api_ws_client_event_drop(apiws, &rb);
878                         break;
879                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
880                         api_ws_client_event_push(apiws, &rb);
881                         break;
882                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
883                         api_ws_client_event_subscribe(apiws, &rb);
884                         break;
885                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
886                         api_ws_client_event_unsubscribe(apiws, &rb);
887                         break;
888                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
889                         api_ws_client_subcall(apiws, &rb);
890                         break;
891                 default: /* unexpected message */
892                         /* TODO: close the connection */
893                         break;
894                 }
895                 pthread_mutex_unlock(&apiws->mutex);
896         }
897         free(data);
898 }
899
900 /* on call, propagate it to the ws service */
901 static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
902 {
903         int rc;
904         struct api_ws_memo *memo;
905         struct writebuf wb = { .count = 0 };
906         const char *raw;
907         size_t szraw;
908         struct api_ws *apiws = closure;
909
910         pthread_mutex_lock(&apiws->mutex);
911
912         /* create the recording data */
913         memo = api_ws_client_memo_make(apiws, xreq);
914         if (memo == NULL) {
915                 afb_xreq_fail_f(xreq, "error", "out of memory");
916                 goto end;
917         }
918
919         /* creates the call message */
920         raw = afb_xreq_raw(xreq, &szraw);
921         if (raw == NULL)
922                 goto internal_error;
923         if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
924          || !api_ws_write_uint32(&wb, memo->msgid)
925          || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
926          || !api_ws_write_string(&wb, xreq->verb)
927          || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
928          || !api_ws_write_string_length(&wb, raw, szraw))
929                 goto overflow;
930
931         /* send */
932         rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
933         if (rc >= 0)
934                 goto end;
935
936         afb_xreq_fail(xreq, "error", "websocket sending error");
937         goto clean_memo;
938
939 internal_error:
940         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
941         goto clean_memo;
942
943 overflow:
944         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
945
946 clean_memo:
947         api_ws_client_memo_destroy(memo);
948 end:
949         pthread_mutex_unlock(&apiws->mutex);
950 }
951
952 /*  */
953 static void api_ws_client_disconnect(struct api_ws *api)
954 {
955         if (api->fd >= 0) {
956                 afb_ws_destroy(api->client.ws);
957                 api->client.ws = NULL;
958                 close(api->fd);
959                 api->fd = -1;
960         }
961 }
962
963 /*  */
964 static int api_ws_client_connect(struct api_ws *api)
965 {
966         struct afb_ws *ws;
967         int fd;
968
969         fd = api_ws_socket(api->path, 0);
970         if (fd >= 0) {
971                 ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
972                 if (ws != NULL) {
973                         api->client.ws = ws;
974                         api->fd = fd;
975                         return 0;
976                 }
977                 close(fd);
978         }
979         return -1;
980 }
981
982 static struct afb_api_itf ws_api_itf = {
983         .call = api_ws_client_call_cb
984 };
985
986 /* adds a afb-ws-service client api */
987 int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset)
988 {
989         int rc;
990         struct api_ws *api;
991         struct afb_api afb_api;
992
993         /* create the ws client api */
994         api = api_ws_make(path);
995         if (api == NULL)
996                 goto error;
997
998         /* connect to the service */
999         rc = api_ws_client_connect(api);
1000         if (rc < 0) {
1001                 ERROR("can't connect to ws service %s", api->path);
1002                 goto error2;
1003         }
1004
1005         /* record it as an API */
1006         afb_api.closure = api;
1007         afb_api.itf = &ws_api_itf;
1008         if (afb_apiset_add(apiset, api->api, afb_api) < 0)
1009                 goto error3;
1010
1011         return 0;
1012
1013 error3:
1014         api_ws_client_disconnect(api);
1015 error2:
1016         free(api);
1017 error:
1018         return -1;
1019 }
1020
1021 /******************* client description part for server *****************************/
1022
1023 static void api_ws_server_client_unref(struct api_ws_client *client)
1024 {
1025         struct api_ws_subcall *sc, *nsc;
1026
1027         if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
1028                 afb_evt_listener_unref(client->listener);
1029                 afb_ws_destroy(client->ws);
1030                 nsc = client->subcalls;
1031                 while (nsc) {
1032                         sc= nsc;
1033                         nsc = sc->next;
1034                         sc->callback(sc->closure, 1, NULL);
1035                         free(sc);
1036                 }
1037                 afb_cred_unref(client->cred);
1038                 afb_apiset_unref(client->apiset);
1039                 free(client);
1040         }
1041 }
1042
1043 static void api_ws_server_client_addref(struct api_ws_client *client)
1044 {
1045         __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
1046 }
1047
1048 /* on call, propagate it to the ws service */
1049 static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
1050 {
1051         struct api_ws_server_req *wreq;
1052         char *cverb;
1053         const char *uuid, *verb;
1054         uint32_t flags, msgid;
1055         size_t lenverb;
1056         struct json_object *object;
1057
1058         api_ws_server_client_addref(client);
1059
1060         /* reads the call message data */
1061         if (!api_ws_read_uint32(rb, &msgid)
1062          || !api_ws_read_uint32(rb, &flags)
1063          || !api_ws_read_string(rb, &verb, &lenverb)
1064          || !api_ws_read_string(rb, &uuid, NULL)
1065          || !api_ws_read_object(rb, &object))
1066                 goto overflow;
1067
1068         /* create the request */
1069         wreq = malloc(++lenverb + sizeof *wreq);
1070         if (wreq == NULL)
1071                 goto out_of_memory;
1072
1073         afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
1074         wreq->client = client;
1075         wreq->msgid = msgid;
1076         cverb = (char*)&wreq[1];
1077         memcpy(cverb, verb, lenverb);
1078
1079         /* init the context */
1080         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
1081                 goto unconnected;
1082         wreq->xreq.context.flags = flags;
1083
1084         /* makes the call */
1085         wreq->xreq.cred = afb_cred_addref(client->cred);
1086         wreq->xreq.api = client->api;
1087         wreq->xreq.verb = cverb;
1088         wreq->xreq.json = object;
1089         afb_xreq_process(&wreq->xreq, client->apiset);
1090         return;
1091
1092 unconnected:
1093         free(wreq);
1094 out_of_memory:
1095         json_object_put(object);
1096 overflow:
1097         api_ws_server_client_unref(client);
1098 }
1099
1100 /* on subcall reply */
1101 static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
1102 {
1103         char iserror;
1104         uint32_t subcallid;
1105         struct json_object *object;
1106         struct api_ws_subcall *sc, **psc;
1107
1108         /* reads the call message data */
1109         if (!api_ws_read_uint32(rb, &subcallid)
1110          || !api_ws_read_char(rb, &iserror)
1111          || !api_ws_read_object(rb, &object)) {
1112                 /* TODO bad protocol */
1113                 return;
1114         }
1115
1116         /* search the subcall and unlink it */
1117         pthread_mutex_lock(&client->mutex);
1118         psc = &client->subcalls;
1119         while ((sc = *psc) && sc->subcallid != subcallid)
1120                 psc = &sc->next;
1121         if (!sc) {
1122                 pthread_mutex_unlock(&client->mutex);
1123                 /* TODO subcall not found */
1124         } else {
1125                 *psc = sc->next;
1126                 pthread_mutex_unlock(&client->mutex);
1127                 sc->callback(sc->closure, (int)iserror, object);
1128                 free(sc);
1129         }
1130         json_object_put(object);
1131 }
1132
1133 /* callback when receiving binary data */
1134 static void api_ws_server_on_binary(void *closure, char *data, size_t size)
1135 {
1136         if (size > 0) {
1137                 struct readbuf rb = { .head = data, .end = data + size };
1138                 switch (*rb.head++) {
1139                 case CHAR_FOR_CALL:
1140                         api_ws_server_on_call(closure, &rb);
1141                         break;
1142                 case CHAR_FOR_SUBCALL_REPLY:
1143                         api_ws_server_on_subcall_reply(closure, &rb);
1144                         break;
1145                 default: /* unexpected message */
1146                         /* TODO: close the connection */
1147                         break;
1148                 }
1149         }
1150         free(data);
1151 }
1152
1153 /* callback when receiving a hangup */
1154 static void api_ws_server_on_hangup(void *closure)
1155 {
1156         struct api_ws_client *client = closure;
1157
1158         /* close the socket */
1159         if (client->fd >= 0) {
1160                 close(client->fd);
1161                 client->fd = -1;
1162         }
1163
1164         /* release the client */
1165         api_ws_server_client_unref(client);
1166 }
1167
1168 static void api_ws_server_accept(struct api_ws *api)
1169 {
1170         struct api_ws_client *client;
1171         struct sockaddr addr;
1172         socklen_t lenaddr;
1173
1174         client = calloc(1, sizeof *client);
1175         if (client != NULL) {
1176                 client->listener = afb_evt_listener_create(&api_ws_server_evt_itf, client);
1177                 if (client->listener != NULL) {
1178                         lenaddr = (socklen_t)sizeof addr;
1179                         client->fd = accept(api->fd, &addr, &lenaddr);
1180                         if (client->fd >= 0) {
1181                                 client->cred = afb_cred_create_for_socket(client->fd);
1182                                 fcntl(client->fd, F_SETFD, FD_CLOEXEC);
1183                                 fcntl(client->fd, F_SETFL, O_NONBLOCK);
1184                                 client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
1185                                 if (client->ws != NULL) {
1186                                         client->api = api->api;
1187                                         client->apiset = afb_apiset_addref(api->server.apiset);
1188                                         client->refcount = 1;
1189                                         client->subcalls = NULL;
1190                                         return;
1191                                 }
1192                                 afb_cred_unref(client->cred);
1193                                 close(client->fd);
1194                         }
1195                         afb_evt_listener_unref(client->listener);
1196                 }
1197                 free(client);
1198         }
1199 }
1200
1201 /******************* server part: manage events **********************************/
1202
1203 static void api_ws_server_event_send(struct api_ws_client *client, char order, const char *event, int eventid, const char *data)
1204 {
1205         int rc;
1206         struct writebuf wb = { .count = 0 };
1207
1208         if (api_ws_write_char(&wb, order)
1209          && api_ws_write_uint32(&wb, eventid)
1210          && api_ws_write_string(&wb, event)
1211          && (data == NULL || api_ws_write_string(&wb, data))) {
1212                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1213                 if (rc >= 0)
1214                         return;
1215         }
1216         ERROR("error while sending %c for event %s", order, event);
1217 }
1218
1219 static void api_ws_server_event_add(void *closure, const char *event, int eventid)
1220 {
1221         api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
1222 }
1223
1224 static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
1225 {
1226         api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1227 }
1228
1229 static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1230 {
1231         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1232         api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1233         json_object_put(object);
1234 }
1235
1236 static void api_ws_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1237 {
1238         int rc;
1239         struct api_ws_client *client = closure;
1240
1241         struct writebuf wb = { .count = 0 };
1242
1243         if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
1244                 rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
1245                 if (rc < 0)
1246                         ERROR("error while broadcasting event %s", event);
1247         } else
1248                 ERROR("error while broadcasting event %s", event);
1249         json_object_put(object);
1250 }
1251
1252 /******************* ws request part for server *****************/
1253
1254 /* decrement the reference count of the request and free/release it on falling to null */
1255 static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq)
1256 {
1257         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1258
1259         afb_context_disconnect(&wreq->xreq.context);
1260         afb_cred_unref(wreq->xreq.cred);
1261         json_object_put(wreq->xreq.json);
1262         api_ws_server_client_unref(wreq->client);
1263         free(wreq);
1264 }
1265
1266 static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
1267 {
1268         int rc;
1269         struct writebuf wb = { .count = 0 };
1270         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1271
1272         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
1273          && api_ws_write_uint32(&wb, wreq->msgid)
1274          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1275          && api_ws_write_string(&wb, info ? : "")
1276          && api_ws_write_object(&wb, obj)) {
1277                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1278                 if (rc >= 0)
1279                         goto success;
1280         }
1281         ERROR("error while sending success");
1282 success:
1283         json_object_put(obj);
1284 }
1285
1286 static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
1287 {
1288         int rc;
1289         struct writebuf wb = { .count = 0 };
1290         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1291
1292         if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL)
1293          && api_ws_write_uint32(&wb, wreq->msgid)
1294          && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
1295          && api_ws_write_string(&wb, status)
1296          && api_ws_write_string(&wb, info ? : "")) {
1297                 rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1298                 if (rc >= 0)
1299                         return;
1300         }
1301         ERROR("error while sending fail");
1302 }
1303
1304 static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
1305 {
1306         int rc;
1307         struct writebuf wb = { .count = 0 };
1308         struct api_ws_subcall *sc, *osc;
1309         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1310         struct api_ws_client *client = wreq->client;
1311
1312         sc = malloc(sizeof *sc);
1313         if (!sc) {
1314
1315         } else {
1316                 sc->callback = callback;
1317                 sc->closure = cb_closure;
1318
1319                 pthread_mutex_unlock(&client->mutex);
1320                 sc->subcallid = ptr2id(sc);
1321                 do {
1322                         sc->subcallid++;
1323                         osc = client->subcalls;
1324                         while(osc && osc->subcallid != sc->subcallid)
1325                                 osc = osc->next;
1326                 } while (osc);
1327                 sc->next = client->subcalls;
1328                 client->subcalls = sc;
1329                 pthread_mutex_unlock(&client->mutex);
1330
1331                 if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
1332                  && api_ws_write_uint32(&wb, wreq->msgid)
1333                  && api_ws_write_uint32(&wb, sc->subcallid)
1334                  && api_ws_write_string(&wb, api)
1335                  && api_ws_write_string(&wb, verb)
1336                  && api_ws_write_object(&wb, args)) {
1337                         rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1338                         if (rc >= 0)
1339                                 return;
1340                 }
1341                 ERROR("error while sending fail");
1342         }
1343 }
1344
1345 static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1346 {
1347         int rc, rc2;
1348         struct writebuf wb = { .count = 0 };
1349         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1350
1351         rc = afb_evt_add_watch(wreq->client->listener, event);
1352         if (rc < 0)
1353                 return rc;
1354
1355         if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
1356          && api_ws_write_uint32(&wb, wreq->msgid)
1357          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1358          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1359                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1360                 if (rc2 >= 0)
1361                         goto success;
1362         }
1363         ERROR("error while subscribing event");
1364 success:
1365         return rc;
1366 }
1367
1368 static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
1369 {
1370         int rc, rc2;
1371         struct writebuf wb = { .count = 0 };
1372         struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
1373
1374         if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
1375          && api_ws_write_uint32(&wb, wreq->msgid)
1376          && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
1377          && api_ws_write_string(&wb, afb_evt_event_name(event))) {
1378                 rc2 = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
1379                 if (rc2 >= 0)
1380                         goto success;
1381         }
1382         ERROR("error while subscribing event");
1383 success:
1384         rc = afb_evt_remove_watch(wreq->client->listener, event);
1385         return rc;
1386 }
1387
1388 /******************* server part **********************************/
1389
1390 static int api_ws_server_connect(struct api_ws *api);
1391
1392 static int api_ws_server_listen_callback(sd_event_source *src, int fd, uint32_t revents, void *closure)
1393 {
1394         if ((revents & EPOLLIN) != 0)
1395                 api_ws_server_accept(closure);
1396         if ((revents & EPOLLHUP) != 0)
1397                 api_ws_server_connect(closure);
1398         return 0;
1399 }
1400
1401 static void api_ws_server_disconnect(struct api_ws *api)
1402 {
1403         if (api->server.listensrc != NULL) {
1404                 sd_event_source_unref(api->server.listensrc);
1405                 api->server.listensrc = NULL;
1406         }
1407         if (api->fd >= 0) {
1408                 close(api->fd);
1409                 api->fd = -1;
1410         }
1411 }
1412
1413 static int api_ws_server_connect(struct api_ws *api)
1414 {
1415         int rc;
1416
1417         /* ensure disconnected */
1418         api_ws_server_disconnect(api);
1419
1420         /* request the service object name */
1421         api->fd = api_ws_socket(api->path, 1);
1422         if (api->fd < 0)
1423                 ERROR("can't create socket %s", api->path);
1424         else {
1425                 /* listen for service */
1426                 rc = sd_event_add_io(afb_common_get_event_loop(), &api->server.listensrc, api->fd, EPOLLIN, api_ws_server_listen_callback, api);
1427                 if (rc >= 0)
1428                         return 0;
1429                 close(api->fd);
1430                 errno = -rc;
1431                 ERROR("can't add ws object %s", api->path);
1432         }
1433         return -1;
1434 }
1435
1436 /* create the service */
1437 int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset)
1438 {
1439         int rc;
1440         struct api_ws *api;
1441
1442         /* creates the ws api object */
1443         api = api_ws_make(path);
1444         if (api == NULL)
1445                 goto error;
1446
1447         /* connect for serving */
1448         rc = api_ws_server_connect(api);
1449         if (rc < 0)
1450                 goto error2;
1451
1452         api->server.apiset = afb_apiset_addref(apiset);
1453         return 0;
1454
1455 error2:
1456         free(api);
1457 error:
1458         return -1;
1459 }
1460
1461