afb-stub-ws: improvements
[src/app-framework-binder.git] / src / afb-stub-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #define NO_PLUGIN_VERBOSE_MACRO
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <errno.h>
28 #include <endian.h>
29 #include <netdb.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <pthread.h>
34
35 #include <json-c/json.h>
36 #include <systemd/sd-event.h>
37
38 #include <afb/afb-event-itf.h>
39
40 #include "afb-common.h"
41
42 #include "afb-session.h"
43 #include "afb-cred.h"
44 #include "afb-ws.h"
45 #include "afb-msg-json.h"
46 #include "afb-api.h"
47 #include "afb-apiset.h"
48 #include "afb-stub-ws.h"
49 #include "afb-context.h"
50 #include "afb-evt.h"
51 #include "afb-xreq.h"
52 #include "verbose.h"
53 #include "jobs.h"
54
55 struct afb_stub_ws;
56
57 /************** constants for protocol definition *************************/
58
59 #define CHAR_FOR_CALL             'C'
60 #define CHAR_FOR_ANSWER_SUCCESS   'T'
61 #define CHAR_FOR_ANSWER_FAIL      'F'
62 #define CHAR_FOR_EVT_BROADCAST    '*'
63 #define CHAR_FOR_EVT_ADD          '+'
64 #define CHAR_FOR_EVT_DEL          '-'
65 #define CHAR_FOR_EVT_PUSH         '!'
66 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
67 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
68 #define CHAR_FOR_SUBCALL_CALL     'B'
69 #define CHAR_FOR_SUBCALL_REPLY    'R'
70 #define CHAR_FOR_DESCRIBE         'D'
71 #define CHAR_FOR_DESCRIPTION      'd'
72
73 /******************* handling subcalls *****************************/
74
75 /**
76  * Structure on server side for recording pending
77  * subcalls.
78  */
79 struct server_subcall
80 {
81         struct server_subcall *next;    /**< next subcall for the client */
82         uint32_t subcallid;             /**< the subcallid */
83         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
84         void *closure;                  /**< closure of the callback */
85 };
86
87 /**
88  * Structure for sending back replies on client side
89  */
90 struct client_subcall
91 {
92         struct afb_stub_ws *stubws;     /**< stub descriptor */
93         uint32_t subcallid;             /**< subcallid for the reply */
94 };
95
96 /*
97  * structure for recording calls on client side
98  */
99 struct client_call {
100         struct client_call *next;       /* the next call */
101         struct afb_stub_ws *stubws;     /* the stub_ws */
102         struct afb_xreq *xreq;          /* the request handle */
103         uint32_t msgid;                 /* the message identifier */
104 };
105
106 /*
107  * structure for a ws request
108  */
109 struct server_req {
110         struct afb_xreq xreq;           /* the xreq */
111         struct afb_stub_ws *stubws;     /* the client of the request */
112         uint32_t msgid;                 /* the incoming request msgid */
113 };
114
115 /*
116  * structure for recording events on client side
117  */
118 struct client_event
119 {
120         struct client_event *next;
121         struct afb_event event;
122         int eventid;
123         int refcount;
124 };
125
126 /*
127  * structure for recording describe requests
128  */
129 struct client_describe
130 {
131         struct client_describe *next;
132         struct afb_stub_ws *stubws;
133         struct jobloop *jobloop;
134         struct json_object *result;
135         uint32_t descid;
136 };
137
138 /*
139  * structure for jobs of describing
140  */
141 struct server_describe
142 {
143         struct afb_stub_ws *stubws;
144         uint32_t descid;
145 };
146
147 /******************* stub description for client or servers ******************/
148
149 struct afb_stub_ws
150 {
151         /* count of references */
152         int refcount;
153
154         /* file descriptor */
155         int fd;
156
157         /* resource control */
158         pthread_mutex_t mutex;
159
160         /* websocket */
161         struct afb_ws *ws;
162
163         /* listener for events (server side) */
164         struct afb_evt_listener *listener;
165
166         /* event replica (client side) */
167         struct client_event *events;
168
169         /* emitted calls (client side) */
170         struct client_call *calls;
171
172         /* credentials (server side) */
173         struct afb_cred *cred;
174
175         /* pending subcalls (server side) */
176         struct server_subcall *subcalls;
177
178         /* pending description (client side) */
179         struct client_describe *describes;
180
181         /* apiset */
182         struct afb_apiset *apiset;
183
184         /* on hangup callback */
185         void (*on_hangup)(struct afb_stub_ws *);
186
187         /* the api name */
188         char apiname[1];
189 };
190
191 /******************* common useful tools **********************************/
192
193 /**
194  * translate a pointer to some integer
195  * @param ptr the pointer to translate
196  * @return an integer
197  */
198 static inline uint32_t ptr2id(void *ptr)
199 {
200         return (uint32_t)(((intptr_t)ptr) >> 6);
201 }
202
203 /******************* serialisation part **********************************/
204
205 struct readbuf
206 {
207         char *head, *end;
208 };
209
210 #define WRITEBUF_COUNT_MAX  32
211 struct writebuf
212 {
213         struct iovec iovec[WRITEBUF_COUNT_MAX];
214         uint32_t uints[WRITEBUF_COUNT_MAX];
215         int count;
216 };
217
218 static char *readbuf_get(struct readbuf *rb, uint32_t length)
219 {
220         char *before = rb->head;
221         char *after = before + length;
222         if (after > rb->end)
223                 return 0;
224         rb->head = after;
225         return before;
226 }
227
228 static int readbuf_char(struct readbuf *rb, char *value)
229 {
230         if (rb->head >= rb->end)
231                 return 0;
232         *value = *rb->head++;
233         return 1;
234 }
235
236 static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
237 {
238         char *after = rb->head + sizeof *value;
239         if (after > rb->end)
240                 return 0;
241         memcpy(value, rb->head, sizeof *value);
242         rb->head = after;
243         *value = le32toh(*value);
244         return 1;
245 }
246
247 static int readbuf_string(struct readbuf *rb, const char **value, size_t *length)
248 {
249         uint32_t len;
250         if (!readbuf_uint32(rb, &len) || !len)
251                 return 0;
252         if (length)
253                 *length = (size_t)(len - 1);
254         return (*value = readbuf_get(rb, len)) != NULL &&  rb->head[-1] == 0;
255 }
256
257 static int readbuf_object(struct readbuf *rb, struct json_object **object)
258 {
259         const char *string;
260         struct json_object *o;
261         int rc = readbuf_string(rb, &string, NULL);
262         if (rc) {
263                 o = json_tokener_parse(string);
264                 if (o == NULL && strcmp(string, "null"))
265                         o = json_object_new_string(string);
266                 *object = o;
267         }
268         return rc;
269 }
270
271 static int writebuf_put(struct writebuf *wb, const void *value, size_t length)
272 {
273         int i = wb->count;
274         if (i == WRITEBUF_COUNT_MAX)
275                 return 0;
276         wb->iovec[i].iov_base = (void*)value;
277         wb->iovec[i].iov_len = length;
278         wb->count = i + 1;
279         return 1;
280 }
281
282 static int writebuf_char(struct writebuf *wb, char value)
283 {
284         int i = wb->count;
285         if (i == WRITEBUF_COUNT_MAX)
286                 return 0;
287         *(char*)&wb->uints[i] = value;
288         wb->iovec[i].iov_base = &wb->uints[i];
289         wb->iovec[i].iov_len = 1;
290         wb->count = i + 1;
291         return 1;
292 }
293
294 static int writebuf_uint32(struct writebuf *wb, uint32_t value)
295 {
296         int i = wb->count;
297         if (i == WRITEBUF_COUNT_MAX)
298                 return 0;
299         wb->uints[i] = htole32(value);
300         wb->iovec[i].iov_base = &wb->uints[i];
301         wb->iovec[i].iov_len = sizeof wb->uints[i];
302         wb->count = i + 1;
303         return 1;
304 }
305
306 static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
307 {
308         uint32_t len = (uint32_t)++length;
309         return (size_t)len == length && len && writebuf_uint32(wb, len) && writebuf_put(wb, value, length);
310 }
311
312 static int writebuf_string(struct writebuf *wb, const char *value)
313 {
314         return writebuf_string_length(wb, value, strlen(value));
315 }
316
317 static int writebuf_object(struct writebuf *wb, struct json_object *object)
318 {
319         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
320         return string != NULL && writebuf_string(wb, string);
321 }
322
323 /******************* ws request part for server *****************/
324
325 /* decrement the reference count of the request and free/release it on falling to null */
326 static void server_req_destroy_cb(struct afb_xreq *xreq)
327 {
328         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
329
330         afb_context_disconnect(&wreq->xreq.context);
331         afb_cred_unref(wreq->xreq.cred);
332         json_object_put(wreq->xreq.json);
333         afb_stub_ws_unref(wreq->stubws);
334         free(wreq);
335 }
336
337 static void server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
338 {
339         int rc;
340         struct writebuf wb = { .count = 0 };
341         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
342
343         if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
344          && writebuf_uint32(&wb, wreq->msgid)
345          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
346          && writebuf_string(&wb, info ? : "")
347          && writebuf_object(&wb, obj)) {
348                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
349                 if (rc >= 0)
350                         goto success;
351         }
352         ERROR("error while sending success");
353 success:
354         json_object_put(obj);
355 }
356
357 static void server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
358 {
359         int rc;
360         struct writebuf wb = { .count = 0 };
361         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
362
363         if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
364          && writebuf_uint32(&wb, wreq->msgid)
365          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
366          && writebuf_string(&wb, status)
367          && writebuf_string(&wb, info ? : "")) {
368                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
369                 if (rc >= 0)
370                         return;
371         }
372         ERROR("error while sending fail");
373 }
374
375 static void server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
376 {
377         int rc;
378         struct writebuf wb = { .count = 0 };
379         struct server_subcall *sc, *osc;
380         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
381         struct afb_stub_ws *stubws = wreq->stubws;
382
383         sc = malloc(sizeof *sc);
384         if (!sc) {
385                 callback(cb_closure, 1, afb_msg_json_internal_error());
386         } else {
387                 sc->callback = callback;
388                 sc->closure = cb_closure;
389
390                 pthread_mutex_unlock(&stubws->mutex);
391                 sc->subcallid = ptr2id(sc);
392                 do {
393                         sc->subcallid++;
394                         osc = stubws->subcalls;
395                         while(osc && osc->subcallid != sc->subcallid)
396                                 osc = osc->next;
397                 } while (osc);
398                 sc->next = stubws->subcalls;
399                 stubws->subcalls = sc;
400                 pthread_mutex_unlock(&stubws->mutex);
401
402                 if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL)
403                  && writebuf_uint32(&wb, wreq->msgid)
404                  && writebuf_uint32(&wb, sc->subcallid)
405                  && writebuf_string(&wb, api)
406                  && writebuf_string(&wb, verb)
407                  && writebuf_object(&wb, args)) {
408                         rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
409                         if (rc >= 0)
410                                 return;
411                 }
412                 ERROR("error while sending fail");
413         }
414 }
415
416 static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
417 {
418         int rc, rc2;
419         struct writebuf wb = { .count = 0 };
420         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
421
422         rc = afb_evt_add_watch(wreq->stubws->listener, event);
423         if (rc < 0)
424                 return rc;
425
426         if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
427          && writebuf_uint32(&wb, wreq->msgid)
428          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
429          && writebuf_string(&wb, afb_evt_event_name(event))) {
430                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
431                 if (rc2 >= 0)
432                         goto success;
433         }
434         ERROR("error while subscribing event");
435 success:
436         return rc;
437 }
438
439 static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
440 {
441         int rc, rc2;
442         struct writebuf wb = { .count = 0 };
443         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
444
445         if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
446          && writebuf_uint32(&wb, wreq->msgid)
447          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
448          && writebuf_string(&wb, afb_evt_event_name(event))) {
449                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
450                 if (rc2 >= 0)
451                         goto success;
452         }
453         ERROR("error while subscribing event");
454 success:
455         rc = afb_evt_remove_watch(wreq->stubws->listener, event);
456         return rc;
457 }
458
459 static const struct afb_xreq_query_itf server_req_xreq_itf = {
460         .success = server_req_success_cb,
461         .fail = server_req_fail_cb,
462         .unref = server_req_destroy_cb,
463         .subcall = server_req_subcall_cb,
464         .subscribe = server_req_subscribe_cb,
465         .unsubscribe = server_req_unsubscribe_cb
466 };
467
468 /******************* client part **********************************/
469
470 /* search a memorized call */
471 static struct client_call *client_call_search(struct afb_stub_ws *stubws, uint32_t msgid)
472 {
473         struct client_call *call;
474
475         call = stubws->calls;
476         while (call != NULL && call->msgid != msgid)
477                 call = call->next;
478
479         return call;
480 }
481
482 /* search the event */
483 static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name)
484 {
485         struct client_event *ev;
486
487         ev = stubws->events;
488         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
489                 ev = ev->next;
490
491         return ev;
492 }
493
494
495 /* allocates and init the memorizing call */
496 static struct client_call *client_call_make(struct afb_stub_ws *stubws, struct afb_xreq *xreq)
497 {
498         struct client_call *call;
499
500         call = malloc(sizeof *call);
501         if (call != NULL) {
502                 afb_xreq_addref(xreq);
503                 call->xreq = xreq;
504                 call->msgid = ptr2id(call);
505                 while(client_call_search(stubws, call->msgid) != NULL)
506                         call->msgid++;
507                 call->stubws = stubws;
508                 call->next = stubws->calls;
509                 stubws->calls = call;
510         }
511         return call;
512 }
513
514 /* free and release the memorizing call */
515 static void client_call_destroy(struct client_call *call)
516 {
517         struct client_call **prv;
518
519         prv = &call->stubws->calls;
520         while (*prv != NULL) {
521                 if (*prv == call) {
522                         *prv = call->next;
523                         break;
524                 }
525                 prv = &(*prv)->next;
526         }
527
528         afb_xreq_unref(call->xreq);
529         free(call);
530 }
531
532 /* get event data from the message */
533 static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
534 {
535         return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
536 }
537
538 /* get event from the message */
539 static int client_msg_event_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_event **ev)
540 {
541         const char *name;
542         uint32_t eventid;
543
544         /* get event data from the message */
545         if (!client_msg_event_read(rb, &eventid, &name)) {
546                 ERROR("Invalid message");
547                 return 0;
548         }
549
550         /* check conflicts */
551         *ev = client_event_search(stubws, eventid, name);
552         if (*ev == NULL) {
553                 ERROR("event %s not found", name);
554                 return 0;
555         }
556
557         return 1;
558 }
559
560 /* get event from the message */
561 static int client_msg_call_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call)
562 {
563         uint32_t msgid;
564
565         /* get event data from the message */
566         if (!readbuf_uint32(rb, &msgid)) {
567                 ERROR("Invalid message");
568                 return 0;
569         }
570
571         /* get the call */
572         *call = client_call_search(stubws, msgid);
573         if (*call == NULL) {
574                 ERROR("message not found");
575                 return 0;
576         }
577
578         return 1;
579 }
580
581 /* read a subscrition message */
582 static int client_msg_subscription_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call, struct client_event **ev)
583 {
584         return client_msg_call_get(stubws, rb, call) && client_msg_event_get(stubws, rb, ev);
585 }
586
587 /* adds an event */
588 static void client_event_create(struct afb_stub_ws *stubws, struct readbuf *rb)
589 {
590         size_t offset;
591         const char *name;
592         uint32_t eventid;
593         struct client_event *ev;
594
595         /* get event data from the message */
596         offset = client_msg_event_read(rb, &eventid, &name);
597         if (offset == 0) {
598                 ERROR("Invalid message");
599                 return;
600         }
601
602         /* check conflicts */
603         ev = client_event_search(stubws, eventid, name);
604         if (ev != NULL) {
605                 ev->refcount++;
606                 return;
607         }
608
609         /* no conflict, try to add it */
610         ev = malloc(sizeof *ev);
611         if (ev != NULL) {
612                 ev->event = afb_evt_create_event(name);
613                 if (ev->event.closure == NULL)
614                         free(ev);
615                 else {
616                         ev->refcount = 1;
617                         ev->eventid = eventid;
618                         ev->next = stubws->events;
619                         stubws->events = ev;
620                         return;
621                 }
622         }
623         ERROR("can't create event %s, out of memory", name);
624 }
625
626 /* removes an event */
627 static void client_event_drop(struct afb_stub_ws *stubws, struct readbuf *rb)
628 {
629         struct client_event *ev, **prv;
630
631         /* retrieves the event */
632         if (!client_msg_event_get(stubws, rb, &ev))
633                 return;
634
635         /* decrease the reference count */
636         if (--ev->refcount)
637                 return;
638
639         /* unlinks the event */
640         prv = &stubws->events;
641         while (*prv != ev)
642                 prv = &(*prv)->next;
643         *prv = ev->next;
644
645         /* destroys the event */
646         afb_event_drop(ev->event);
647         free(ev);
648 }
649
650 /* subscribes an event */
651 static void client_event_subscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
652 {
653         struct client_event *ev;
654         struct client_call *call;
655
656         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
657                 /* subscribe the request from the event */
658                 if (afb_xreq_subscribe(call->xreq, ev->event) < 0)
659                         ERROR("can't subscribe: %m");
660         }
661 }
662
663 /* unsubscribes an event */
664 static void client_event_unsubscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
665 {
666         struct client_event *ev;
667         struct client_call *call;
668
669         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
670                 /* unsubscribe the request from the event */
671                 if (afb_xreq_unsubscribe(call->xreq, ev->event) < 0)
672                         ERROR("can't unsubscribe: %m");
673         }
674 }
675
676 /* receives broadcasted events */
677 static void client_event_broadcast(struct afb_stub_ws *stubws, struct readbuf *rb)
678 {
679         struct json_object *object;
680         const char *event;
681
682         if (readbuf_string(rb, &event, NULL) && readbuf_object(rb, &object))
683                 afb_evt_broadcast(event, object);
684         else
685                 ERROR("unreadable broadcasted event");
686 }
687
688 /* pushs an event */
689 static void client_event_push(struct afb_stub_ws *stubws, struct readbuf *rb)
690 {
691         struct client_event *ev;
692         struct json_object *object;
693
694         if (client_msg_event_get(stubws, rb, &ev) && readbuf_object(rb, &object))
695                 afb_event_push(ev->event, object);
696         else
697                 ERROR("unreadable push event");
698 }
699
700 static void client_reply_success(struct afb_stub_ws *stubws, struct readbuf *rb)
701 {
702         struct client_call *call;
703         struct json_object *object;
704         const char *info;
705         uint32_t flags;
706
707         /* retrieve the message data */
708         if (!client_msg_call_get(stubws, rb, &call))
709                 return;
710
711         if (readbuf_uint32(rb, &flags)
712          && readbuf_string(rb, &info, NULL)
713          && readbuf_object(rb, &object)) {
714                 call->xreq->context.flags = (unsigned)flags;
715                 afb_xreq_success(call->xreq, object, *info ? info : NULL);
716         } else {
717                 /* failing to have the answer */
718                 afb_xreq_fail(call->xreq, "error", "ws error");
719         }
720         client_call_destroy(call);
721 }
722
723 static void client_reply_fail(struct afb_stub_ws *stubws, struct readbuf *rb)
724 {
725         struct client_call *call;
726         const char *info, *status;
727         uint32_t flags;
728
729         /* retrieve the message data */
730         if (!client_msg_call_get(stubws, rb, &call))
731                 return;
732
733         if (readbuf_uint32(rb, &flags)
734          && readbuf_string(rb, &status, NULL)
735          && readbuf_string(rb, &info, NULL)) {
736                 call->xreq->context.flags = (unsigned)flags;
737                 afb_xreq_fail(call->xreq, status, *info ? info : NULL);
738         } else {
739                 /* failing to have the answer */
740                 afb_xreq_fail(call->xreq, "error", "ws error");
741         }
742         client_call_destroy(call);
743 }
744
745 /* send a subcall reply */
746 static void client_send_subcall_reply(struct client_subcall *subcall, int status, json_object *object)
747 {
748         int rc;
749         struct writebuf wb = { .count = 0 };
750         char ie = status < 0;
751
752         if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
753          || !writebuf_uint32(&wb, subcall->subcallid)
754          || !writebuf_char(&wb, ie)
755          || !writebuf_object(&wb, object)) {
756                 /* write error ? */
757                 return;
758         }
759
760         rc = afb_ws_binary_v(subcall->stubws->ws, wb.iovec, wb.count);
761         if (rc >= 0)
762                 return;
763         ERROR("error while sending subcall reply");
764 }
765
766 /* callback for subcall reply */
767 static void client_subcall_reply_cb(void *closure, int status, json_object *object)
768 {
769         client_send_subcall_reply(closure, status, object);
770         free(closure);
771 }
772
773 /* received a subcall request */
774 static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb)
775 {
776         struct client_subcall *subcall;
777         struct client_call *call;
778         const char *api, *verb;
779         uint32_t subcallid;
780         struct json_object *object;
781
782         subcall = malloc(sizeof *subcall);
783         if (!subcall)
784                 return;
785
786         /* retrieve the message data */
787         if (!client_msg_call_get(stubws, rb, &call))
788                 return;
789
790         if (readbuf_uint32(rb, &subcallid)
791          && readbuf_string(rb, &api, NULL)
792          && readbuf_string(rb, &verb, NULL)
793          && readbuf_object(rb, &object)) {
794                 subcall->stubws = stubws;
795                 subcall->subcallid = subcallid;
796                 afb_xreq_subcall(call->xreq, api, verb, object, client_subcall_reply_cb, subcall);
797         }
798 }
799
800 /* pushs an event */
801 static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb)
802 {
803         uint32_t descid;
804         struct client_describe *desc;
805         struct json_object *object;
806
807         if (!readbuf_uint32(rb, &descid))
808                 ERROR("unreadable description");
809         else {
810                 desc = stubws->describes;
811                 while (desc && desc->descid != descid)
812                         desc = desc->next;
813                 if (desc == NULL)
814                         ERROR("unexpected description");
815                 else {
816                         if (readbuf_object(rb, &object))
817                                 desc->result = object;
818                         else
819                                 ERROR("bad description");
820                         jobs_leave(desc->jobloop);
821                 }
822         }
823 }
824
825 /* callback when receiving binary data */
826 static void client_on_binary(void *closure, char *data, size_t size)
827 {
828         if (size > 0) {
829                 struct afb_stub_ws *stubws = closure;
830                 struct readbuf rb = { .head = data, .end = data + size };
831
832                 pthread_mutex_lock(&stubws->mutex);
833                 switch (*rb.head++) {
834                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
835                         client_reply_success(stubws, &rb);
836                         break;
837                 case CHAR_FOR_ANSWER_FAIL: /* fail */
838                         client_reply_fail(stubws, &rb);
839                         break;
840                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
841                         client_event_broadcast(stubws, &rb);
842                         break;
843                 case CHAR_FOR_EVT_ADD: /* creates the event */
844                         client_event_create(stubws, &rb);
845                         break;
846                 case CHAR_FOR_EVT_DEL: /* drops the event */
847                         client_event_drop(stubws, &rb);
848                         break;
849                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
850                         client_event_push(stubws, &rb);
851                         break;
852                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
853                         client_event_subscribe(stubws, &rb);
854                         break;
855                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
856                         client_event_unsubscribe(stubws, &rb);
857                         break;
858                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
859                         client_subcall(stubws, &rb);
860                         break;
861                 case CHAR_FOR_DESCRIPTION: /* description */
862                         client_on_description(stubws, &rb);
863                         break;
864                 default: /* unexpected message */
865                         /* TODO: close the connection */
866                         break;
867                 }
868                 pthread_mutex_unlock(&stubws->mutex);
869         }
870         free(data);
871 }
872
873 /* on call, propagate it to the ws service */
874 static void client_call_cb(void * closure, struct afb_xreq *xreq)
875 {
876         int rc;
877         struct client_call *call;
878         struct writebuf wb = { .count = 0 };
879         const char *raw;
880         size_t szraw;
881         struct afb_stub_ws *stubws = closure;
882
883         pthread_mutex_lock(&stubws->mutex);
884
885         /* create the recording data */
886         call = client_call_make(stubws, xreq);
887         if (call == NULL) {
888                 afb_xreq_fail_f(xreq, "error", "out of memory");
889                 goto end;
890         }
891
892         /* creates the call message */
893         raw = afb_xreq_raw(xreq, &szraw);
894         if (raw == NULL)
895                 goto internal_error;
896         if (!writebuf_char(&wb, CHAR_FOR_CALL)
897          || !writebuf_uint32(&wb, call->msgid)
898          || !writebuf_uint32(&wb, (uint32_t)xreq->context.flags)
899          || !writebuf_string(&wb, xreq->verb)
900          || !writebuf_string(&wb, afb_session_uuid(xreq->context.session))
901          || !writebuf_string_length(&wb, raw, szraw))
902                 goto overflow;
903
904         /* send */
905         rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
906         if (rc >= 0)
907                 goto end;
908
909         afb_xreq_fail(xreq, "error", "websocket sending error");
910         goto clean_call;
911
912 internal_error:
913         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
914         goto clean_call;
915
916 overflow:
917         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
918
919 clean_call:
920         client_call_destroy(call);
921 end:
922         pthread_mutex_unlock(&stubws->mutex);
923 }
924
925 static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
926 {
927         struct client_describe *desc = closure;
928         struct writebuf wb = { .count = 0 };
929
930         if (!signum) {
931                 /* record the jobloop */
932                 desc->jobloop = jobloop;
933
934                 /* send */
935                 if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
936                  && writebuf_uint32(&wb, desc->descid)
937                  && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0)
938                         return;
939         }
940         jobs_leave(jobloop);
941 }
942
943 /* get the description */
944 static struct json_object *client_describe_cb(void * closure)
945 {
946         struct client_describe desc, *d;
947         struct afb_stub_ws *stubws = closure;
948
949         /* fill in stack the description of the task */
950         pthread_mutex_lock(&stubws->mutex);
951         desc.result = NULL;
952         desc.descid = ptr2id(&desc);
953         d = stubws->describes;
954         while (d) {
955                 if (d->descid != desc.descid)
956                         d = d->next;
957                 else {
958                         desc.descid++;
959                         d = stubws->describes;
960                 }
961         }
962         desc.stubws = stubws;
963         desc.next = stubws->describes;
964         stubws->describes = &desc;
965         pthread_mutex_unlock(&stubws->mutex);
966
967         /* synchronous job: send the request and wait its result */
968         jobs_enter(NULL, 0, client_send_describe_cb, &desc);
969
970         /* unlink and send the result */
971         pthread_mutex_lock(&stubws->mutex);
972         d = stubws->describes;
973         if (d == &desc)
974                 stubws->describes = desc.next;
975         else {
976                 while (d) {
977                         if (d->next != &desc)
978                                 d = d->next;
979                         else {
980                                 d->next = desc.next;
981                                 d = NULL;
982                         }
983                 }
984         }
985         pthread_mutex_unlock(&stubws->mutex);
986         return desc.result;
987 }
988
989 /******************* client description part for server *****************************/
990
991 /* on call, propagate it to the ws service */
992 static void server_on_call(struct afb_stub_ws *stubws, struct readbuf *rb)
993 {
994         struct server_req *wreq;
995         char *cverb;
996         const char *uuid, *verb;
997         uint32_t flags, msgid;
998         size_t lenverb;
999         struct json_object *object;
1000
1001         afb_stub_ws_addref(stubws);
1002
1003         /* reads the call message data */
1004         if (!readbuf_uint32(rb, &msgid)
1005          || !readbuf_uint32(rb, &flags)
1006          || !readbuf_string(rb, &verb, &lenverb)
1007          || !readbuf_string(rb, &uuid, NULL)
1008          || !readbuf_object(rb, &object))
1009                 goto overflow;
1010
1011         /* create the request */
1012         wreq = malloc(++lenverb + sizeof *wreq);
1013         if (wreq == NULL)
1014                 goto out_of_memory;
1015
1016         afb_xreq_init(&wreq->xreq, &server_req_xreq_itf);
1017         wreq->stubws = stubws;
1018         wreq->msgid = msgid;
1019         cverb = (char*)&wreq[1];
1020         memcpy(cverb, verb, lenverb);
1021
1022         /* init the context */
1023         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
1024                 goto unconnected;
1025         wreq->xreq.context.flags = flags;
1026
1027         /* makes the call */
1028         wreq->xreq.cred = afb_cred_addref(stubws->cred);
1029         wreq->xreq.api = stubws->apiname;
1030         wreq->xreq.verb = cverb;
1031         wreq->xreq.json = object;
1032         afb_xreq_process(&wreq->xreq, stubws->apiset);
1033         return;
1034
1035 unconnected:
1036         free(wreq);
1037 out_of_memory:
1038         json_object_put(object);
1039 overflow:
1040         afb_stub_ws_unref(stubws);
1041 }
1042
1043 /* on subcall reply */
1044 static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb)
1045 {
1046         char ie;
1047         uint32_t subcallid;
1048         struct json_object *object;
1049         struct server_subcall *sc, **psc;
1050
1051         /* reads the call message data */
1052         if (!readbuf_uint32(rb, &subcallid)
1053          || !readbuf_char(rb, &ie)
1054          || !readbuf_object(rb, &object)) {
1055                 /* TODO bad protocol */
1056                 return;
1057         }
1058
1059         /* search the subcall and unlink it */
1060         pthread_mutex_lock(&stubws->mutex);
1061         psc = &stubws->subcalls;
1062         while ((sc = *psc) && sc->subcallid != subcallid)
1063                 psc = &sc->next;
1064         if (!sc) {
1065                 pthread_mutex_unlock(&stubws->mutex);
1066                 /* TODO subcall not found */
1067         } else {
1068                 *psc = sc->next;
1069                 pthread_mutex_unlock(&stubws->mutex);
1070                 sc->callback(sc->closure, -(int)ie, object);
1071                 free(sc);
1072         }
1073         json_object_put(object);
1074 }
1075
1076 static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj)
1077 {
1078         struct writebuf wb = { .count = 0 };
1079
1080         if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
1081          || !writebuf_uint32(&wb, descid)
1082          || !writebuf_object(&wb, descobj)
1083          || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0)
1084                 ERROR("can't send description");
1085 }
1086
1087 static void server_describe_job(int signum, void *closure)
1088 {
1089         struct json_object *obj;
1090         struct server_describe *desc = closure;
1091
1092         /* get the description if possible */
1093         obj = !signum ? afb_apiset_describe(desc->stubws->apiset, desc->stubws->apiname) : NULL;
1094
1095         /* send it */
1096         server_send_description(desc->stubws, desc->descid, obj);
1097         json_object_put(obj);
1098         afb_stub_ws_unref(desc->stubws);
1099         free(desc);
1100 }
1101
1102 /* on describe, propagate it to the ws service */
1103 static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb)
1104 {
1105
1106         uint32_t descid;
1107         struct server_describe *desc;
1108
1109         /* reads the descid */
1110         if (readbuf_uint32(rb, &descid)) {
1111                 /* create asynchronous job */
1112                 desc = malloc(sizeof *desc);
1113                 if (desc) {
1114                         desc->descid = descid;
1115                         desc->stubws = stubws;
1116                         afb_stub_ws_addref(stubws);
1117                         if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
1118                                 server_describe_job(0, desc);
1119                         return;
1120                 }
1121                 server_send_description(stubws, descid, NULL);
1122         }
1123         ERROR("can't provide description");
1124 }
1125
1126 /* callback when receiving binary data */
1127 static void server_on_binary(void *closure, char *data, size_t size)
1128 {
1129         if (size > 0) {
1130                 struct readbuf rb = { .head = data, .end = data + size };
1131                 switch (*rb.head++) {
1132                 case CHAR_FOR_CALL:
1133                         server_on_call(closure, &rb);
1134                         break;
1135                 case CHAR_FOR_SUBCALL_REPLY:
1136                         server_on_subcall_reply(closure, &rb);
1137                         break;
1138                 case CHAR_FOR_DESCRIBE:
1139                         server_on_describe(closure, &rb);
1140                         break;
1141                 default: /* unexpected message */
1142                         /* TODO: close the connection */
1143                         break;
1144                 }
1145         }
1146         free(data);
1147 }
1148
1149 /******************* server part: manage events **********************************/
1150
1151 static void server_event_send(struct afb_stub_ws *stubws, char order, const char *event, int eventid, const char *data)
1152 {
1153         int rc;
1154         struct writebuf wb = { .count = 0 };
1155
1156         if (writebuf_char(&wb, order)
1157          && writebuf_uint32(&wb, eventid)
1158          && writebuf_string(&wb, event)
1159          && (data == NULL || writebuf_string(&wb, data))) {
1160                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
1161                 if (rc >= 0)
1162                         return;
1163         }
1164         ERROR("error while sending %c for event %s", order, event);
1165 }
1166
1167 static void server_event_add(void *closure, const char *event, int eventid)
1168 {
1169         server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
1170 }
1171
1172 static void server_event_remove(void *closure, const char *event, int eventid)
1173 {
1174         server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1175 }
1176
1177 static void server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1178 {
1179         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1180         server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1181         json_object_put(object);
1182 }
1183
1184 static void server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1185 {
1186         int rc;
1187         struct afb_stub_ws *stubws = closure;
1188
1189         struct writebuf wb = { .count = 0 };
1190
1191         if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST) && writebuf_string(&wb, event) && writebuf_object(&wb, object)) {
1192                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
1193                 if (rc < 0)
1194                         ERROR("error while broadcasting event %s", event);
1195         } else
1196                 ERROR("error while broadcasting event %s", event);
1197         json_object_put(object);
1198 }
1199
1200 /*****************************************************/
1201
1202 /* callback when receiving a hangup */
1203 static void server_on_hangup(void *closure)
1204 {
1205         struct afb_stub_ws *stubws = closure;
1206
1207         /* close the socket */
1208         if (stubws->fd >= 0) {
1209                 close(stubws->fd);
1210                 stubws->fd = -1;
1211                 if (stubws->on_hangup)
1212                         stubws->on_hangup(stubws);
1213         }
1214
1215         /* release the client */
1216         afb_stub_ws_unref(stubws);
1217 }
1218
1219 /*****************************************************/
1220
1221 /* the interface for events pushing */
1222 static const struct afb_evt_itf server_evt_itf = {
1223         .broadcast = server_event_broadcast,
1224         .push = server_event_push,
1225         .add = server_event_add,
1226         .remove = server_event_remove
1227 };
1228
1229 static const struct afb_ws_itf stub_ws_client_ws_itf =
1230 {
1231         .on_close = NULL,
1232         .on_text = NULL,
1233         .on_binary = client_on_binary,
1234         .on_error = NULL,
1235         .on_hangup = NULL
1236 };
1237
1238 static const struct afb_ws_itf server_ws_itf =
1239 {
1240         .on_close = NULL,
1241         .on_text = NULL,
1242         .on_binary = server_on_binary,
1243         .on_error = NULL,
1244         .on_hangup = server_on_hangup
1245 };
1246
1247 static struct afb_api_itf ws_api_itf = {
1248         .call = client_call_cb,
1249         .describe = client_describe_cb
1250 };
1251
1252 /*****************************************************/
1253
1254 static struct afb_stub_ws *afb_stub_ws_create(int fd, const char *apiname, struct afb_apiset *apiset, const struct afb_ws_itf *itf)
1255 {
1256         struct afb_stub_ws *stubws;
1257
1258         stubws = calloc(1, sizeof *stubws + strlen(apiname));
1259         if (stubws == NULL)
1260                 errno = ENOMEM;
1261         else {
1262                 fcntl(fd, F_SETFD, FD_CLOEXEC);
1263                 fcntl(fd, F_SETFL, O_NONBLOCK);
1264                 stubws->ws = afb_ws_create(afb_common_get_event_loop(), fd, itf, stubws);
1265                 if (stubws->ws != NULL) {
1266                         stubws->fd = fd;
1267                         strcpy(stubws->apiname, apiname);
1268                         stubws->apiset = afb_apiset_addref(apiset);
1269                         stubws->refcount = 1;
1270                         stubws->subcalls = NULL;
1271                         return stubws;
1272                 }
1273                 free(stubws);
1274         }
1275         return NULL;
1276 }
1277
1278 struct afb_stub_ws *afb_stub_ws_create_client(int fd, const char *apiname, struct afb_apiset *apiset)
1279 {
1280         return afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf);
1281 }
1282
1283 struct afb_stub_ws *afb_stub_ws_create_server(int fd, const char *apiname, struct afb_apiset *apiset)
1284 {
1285         struct afb_stub_ws *stubws;
1286
1287         stubws = afb_stub_ws_create(fd, apiname, apiset, &server_ws_itf);
1288         if (stubws) {
1289                 stubws->cred = afb_cred_create_for_socket(fd);
1290                 stubws->listener = afb_evt_listener_create(&server_evt_itf, stubws);
1291                 if (stubws->listener != NULL)
1292                         return stubws;
1293                 afb_stub_ws_unref(stubws);
1294         }
1295         return NULL;
1296 }
1297
1298 void afb_stub_ws_unref(struct afb_stub_ws *stubws)
1299 {
1300         struct server_subcall *sc, *nsc;
1301
1302         if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
1303                 afb_evt_listener_unref(stubws->listener);
1304                 afb_ws_destroy(stubws->ws);
1305                 nsc = stubws->subcalls;
1306                 while (nsc) {
1307                         sc= nsc;
1308                         nsc = sc->next;
1309                         sc->callback(sc->closure, 1, NULL);
1310                         free(sc);
1311                 }
1312                 afb_cred_unref(stubws->cred);
1313                 afb_apiset_unref(stubws->apiset);
1314                 free(stubws);
1315         }
1316 }
1317
1318 void afb_stub_ws_addref(struct afb_stub_ws *stubws)
1319 {
1320         __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
1321 }
1322
1323 void afb_stub_ws_on_hangup(struct afb_stub_ws *stubws, void (*on_hangup)(struct afb_stub_ws*))
1324 {
1325         stubws->on_hangup = on_hangup;
1326 }
1327
1328 const char *afb_stub_ws_name(struct afb_stub_ws *stubws)
1329 {
1330         return stubws->apiname;
1331 }
1332
1333 struct afb_api afb_stub_ws_client_api(struct afb_stub_ws *stubws)
1334 {
1335         struct afb_api api;
1336
1337         assert(!stubws->listener); /* check client */
1338         api.closure = stubws;
1339         api.itf = &ws_api_itf;
1340         return api;
1341 }
1342
1343 int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset)
1344 {
1345         return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws));
1346 }
1347