Make status common
[src/app-framework-binder.git] / src / afb-stub-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #define NO_PLUGIN_VERBOSE_MACRO
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <errno.h>
28 #include <endian.h>
29 #include <netdb.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <pthread.h>
34
35 #include <json-c/json.h>
36 #include <systemd/sd-event.h>
37
38 #include <afb/afb-event-itf.h>
39
40 #include "afb-common.h"
41
42 #include "afb-session.h"
43 #include "afb-cred.h"
44 #include "afb-ws.h"
45 #include "afb-msg-json.h"
46 #include "afb-api.h"
47 #include "afb-apiset.h"
48 #include "afb-stub-ws.h"
49 #include "afb-context.h"
50 #include "afb-evt.h"
51 #include "afb-xreq.h"
52 #include "verbose.h"
53 #include "jobs.h"
54
55 struct afb_stub_ws;
56
57 /************** constants for protocol definition *************************/
58
59 #define CHAR_FOR_CALL             'C'
60 #define CHAR_FOR_ANSWER_SUCCESS   'T'
61 #define CHAR_FOR_ANSWER_FAIL      'F'
62 #define CHAR_FOR_EVT_BROADCAST    '*'
63 #define CHAR_FOR_EVT_ADD          '+'
64 #define CHAR_FOR_EVT_DEL          '-'
65 #define CHAR_FOR_EVT_PUSH         '!'
66 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
67 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
68 #define CHAR_FOR_SUBCALL_CALL     'B'
69 #define CHAR_FOR_SUBCALL_REPLY    'R'
70 #define CHAR_FOR_DESCRIBE         'D'
71 #define CHAR_FOR_DESCRIPTION      'd'
72
73 /******************* handling subcalls *****************************/
74
75 /**
76  * Structure on server side for recording pending
77  * subcalls.
78  */
79 struct server_subcall
80 {
81         struct server_subcall *next;    /**< next subcall for the client */
82         uint32_t subcallid;             /**< the subcallid */
83         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
84         void *closure;                  /**< closure of the callback */
85 };
86
87 /**
88  * Structure for sending back replies on client side
89  */
90 struct client_subcall
91 {
92         struct afb_stub_ws *stubws;     /**< stub descriptor */
93         uint32_t subcallid;             /**< subcallid for the reply */
94 };
95
96 /*
97  * structure for recording calls on client side
98  */
99 struct client_call {
100         struct client_call *next;       /* the next call */
101         struct afb_stub_ws *stubws;     /* the stub_ws */
102         struct afb_xreq *xreq;          /* the request handle */
103         uint32_t msgid;                 /* the message identifier */
104 };
105
106 /*
107  * structure for a ws request
108  */
109 struct server_req {
110         struct afb_xreq xreq;           /* the xreq */
111         struct afb_stub_ws *stubws;     /* the client of the request */
112         uint32_t msgid;                 /* the incoming request msgid */
113 };
114
115 /*
116  * structure for recording events on client side
117  */
118 struct client_event
119 {
120         struct client_event *next;
121         struct afb_event event;
122         int eventid;
123         int refcount;
124 };
125
126 /*
127  * structure for recording describe requests
128  */
129 struct client_describe
130 {
131         struct client_describe *next;
132         struct afb_stub_ws *stubws;
133         struct jobloop *jobloop;
134         struct json_object *result;
135         uint32_t descid;
136 };
137
138 /*
139  * structure for jobs of describing
140  */
141 struct server_describe
142 {
143         struct afb_stub_ws *stubws;
144         uint32_t descid;
145 };
146
147 /******************* client description part for server *****************************/
148
149 struct afb_stub_ws
150 {
151         /* count of references */
152         int refcount;
153
154         /* file descriptor */
155         int fd;
156
157         /* resource control */
158         pthread_mutex_t mutex;
159
160         /* websocket */
161         struct afb_ws *ws;
162
163         /* listener for events (server side) */
164         struct afb_evt_listener *listener;
165
166         /* event replica (client side) */
167         struct client_event *events;
168
169         /* emitted calls (client side) */
170         struct client_call *calls;
171
172         /* credentials (server side) */
173         struct afb_cred *cred;
174
175         /* pending subcalls (server side) */
176         struct server_subcall *subcalls;
177
178         /* pending description (client side) */
179         struct client_describe *describes;
180
181         /* apiset */
182         struct afb_apiset *apiset;
183
184         /* the api name */
185         char apiname[1];
186 };
187
188 /******************* common useful tools **********************************/
189
190 /**
191  * translate a pointer to some integer
192  * @param ptr the pointer to translate
193  * @return an integer
194  */
195 static inline uint32_t ptr2id(void *ptr)
196 {
197         return (uint32_t)(((intptr_t)ptr) >> 6);
198 }
199
200 /******************* serialisation part **********************************/
201
202 struct readbuf
203 {
204         char *head, *end;
205 };
206
207 #define WRITEBUF_COUNT_MAX  32
208 struct writebuf
209 {
210         struct iovec iovec[WRITEBUF_COUNT_MAX];
211         uint32_t uints[WRITEBUF_COUNT_MAX];
212         int count;
213 };
214
215 static char *readbuf_get(struct readbuf *rb, uint32_t length)
216 {
217         char *before = rb->head;
218         char *after = before + length;
219         if (after > rb->end)
220                 return 0;
221         rb->head = after;
222         return before;
223 }
224
225 static int readbuf_char(struct readbuf *rb, char *value)
226 {
227         if (rb->head >= rb->end)
228                 return 0;
229         *value = *rb->head++;
230         return 1;
231 }
232
233 static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
234 {
235         char *after = rb->head + sizeof *value;
236         if (after > rb->end)
237                 return 0;
238         memcpy(value, rb->head, sizeof *value);
239         rb->head = after;
240         *value = le32toh(*value);
241         return 1;
242 }
243
244 static int readbuf_string(struct readbuf *rb, const char **value, size_t *length)
245 {
246         uint32_t len;
247         if (!readbuf_uint32(rb, &len) || !len)
248                 return 0;
249         if (length)
250                 *length = (size_t)(len - 1);
251         return (*value = readbuf_get(rb, len)) != NULL &&  rb->head[-1] == 0;
252 }
253
254 static int readbuf_object(struct readbuf *rb, struct json_object **object)
255 {
256         const char *string;
257         struct json_object *o;
258         int rc = readbuf_string(rb, &string, NULL);
259         if (rc) {
260                 o = json_tokener_parse(string);
261                 if (o == NULL && strcmp(string, "null"))
262                         o = json_object_new_string(string);
263                 *object = o;
264         }
265         return rc;
266 }
267
268 static int writebuf_put(struct writebuf *wb, const void *value, size_t length)
269 {
270         int i = wb->count;
271         if (i == WRITEBUF_COUNT_MAX)
272                 return 0;
273         wb->iovec[i].iov_base = (void*)value;
274         wb->iovec[i].iov_len = length;
275         wb->count = i + 1;
276         return 1;
277 }
278
279 static int writebuf_char(struct writebuf *wb, char value)
280 {
281         int i = wb->count;
282         if (i == WRITEBUF_COUNT_MAX)
283                 return 0;
284         *(char*)&wb->uints[i] = value;
285         wb->iovec[i].iov_base = &wb->uints[i];
286         wb->iovec[i].iov_len = 1;
287         wb->count = i + 1;
288         return 1;
289 }
290
291 static int writebuf_uint32(struct writebuf *wb, uint32_t value)
292 {
293         int i = wb->count;
294         if (i == WRITEBUF_COUNT_MAX)
295                 return 0;
296         wb->uints[i] = htole32(value);
297         wb->iovec[i].iov_base = &wb->uints[i];
298         wb->iovec[i].iov_len = sizeof wb->uints[i];
299         wb->count = i + 1;
300         return 1;
301 }
302
303 static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
304 {
305         uint32_t len = (uint32_t)++length;
306         return (size_t)len == length && len && writebuf_uint32(wb, len) && writebuf_put(wb, value, length);
307 }
308
309 static int writebuf_string(struct writebuf *wb, const char *value)
310 {
311         return writebuf_string_length(wb, value, strlen(value));
312 }
313
314 static int writebuf_object(struct writebuf *wb, struct json_object *object)
315 {
316         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
317         return string != NULL && writebuf_string(wb, string);
318 }
319
320 /******************* ws request part for server *****************/
321
322 /* decrement the reference count of the request and free/release it on falling to null */
323 static void server_req_destroy_cb(struct afb_xreq *xreq)
324 {
325         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
326
327         afb_context_disconnect(&wreq->xreq.context);
328         afb_cred_unref(wreq->xreq.cred);
329         json_object_put(wreq->xreq.json);
330         afb_stub_ws_unref(wreq->stubws);
331         free(wreq);
332 }
333
334 static void server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
335 {
336         int rc;
337         struct writebuf wb = { .count = 0 };
338         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
339
340         if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
341          && writebuf_uint32(&wb, wreq->msgid)
342          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
343          && writebuf_string(&wb, info ? : "")
344          && writebuf_object(&wb, obj)) {
345                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
346                 if (rc >= 0)
347                         goto success;
348         }
349         ERROR("error while sending success");
350 success:
351         json_object_put(obj);
352 }
353
354 static void server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
355 {
356         int rc;
357         struct writebuf wb = { .count = 0 };
358         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
359
360         if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
361          && writebuf_uint32(&wb, wreq->msgid)
362          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
363          && writebuf_string(&wb, status)
364          && writebuf_string(&wb, info ? : "")) {
365                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
366                 if (rc >= 0)
367                         return;
368         }
369         ERROR("error while sending fail");
370 }
371
372 static void server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
373 {
374         int rc;
375         struct writebuf wb = { .count = 0 };
376         struct server_subcall *sc, *osc;
377         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
378         struct afb_stub_ws *stubws = wreq->stubws;
379
380         sc = malloc(sizeof *sc);
381         if (!sc) {
382                 callback(cb_closure, 1, afb_msg_json_internal_error());
383         } else {
384                 sc->callback = callback;
385                 sc->closure = cb_closure;
386
387                 pthread_mutex_unlock(&stubws->mutex);
388                 sc->subcallid = ptr2id(sc);
389                 do {
390                         sc->subcallid++;
391                         osc = stubws->subcalls;
392                         while(osc && osc->subcallid != sc->subcallid)
393                                 osc = osc->next;
394                 } while (osc);
395                 sc->next = stubws->subcalls;
396                 stubws->subcalls = sc;
397                 pthread_mutex_unlock(&stubws->mutex);
398
399                 if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL)
400                  && writebuf_uint32(&wb, wreq->msgid)
401                  && writebuf_uint32(&wb, sc->subcallid)
402                  && writebuf_string(&wb, api)
403                  && writebuf_string(&wb, verb)
404                  && writebuf_object(&wb, args)) {
405                         rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
406                         if (rc >= 0)
407                                 return;
408                 }
409                 ERROR("error while sending fail");
410         }
411 }
412
413 static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
414 {
415         int rc, rc2;
416         struct writebuf wb = { .count = 0 };
417         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
418
419         rc = afb_evt_add_watch(wreq->stubws->listener, event);
420         if (rc < 0)
421                 return rc;
422
423         if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
424          && writebuf_uint32(&wb, wreq->msgid)
425          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
426          && writebuf_string(&wb, afb_evt_event_name(event))) {
427                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
428                 if (rc2 >= 0)
429                         goto success;
430         }
431         ERROR("error while subscribing event");
432 success:
433         return rc;
434 }
435
436 static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
437 {
438         int rc, rc2;
439         struct writebuf wb = { .count = 0 };
440         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
441
442         if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
443          && writebuf_uint32(&wb, wreq->msgid)
444          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
445          && writebuf_string(&wb, afb_evt_event_name(event))) {
446                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
447                 if (rc2 >= 0)
448                         goto success;
449         }
450         ERROR("error while subscribing event");
451 success:
452         rc = afb_evt_remove_watch(wreq->stubws->listener, event);
453         return rc;
454 }
455
456 static const struct afb_xreq_query_itf server_req_xreq_itf = {
457         .success = server_req_success_cb,
458         .fail = server_req_fail_cb,
459         .unref = server_req_destroy_cb,
460         .subcall = server_req_subcall_cb,
461         .subscribe = server_req_subscribe_cb,
462         .unsubscribe = server_req_unsubscribe_cb
463 };
464
465 /******************* client part **********************************/
466
467 /* search a memorized call */
468 static struct client_call *client_call_search(struct afb_stub_ws *stubws, uint32_t msgid)
469 {
470         struct client_call *call;
471
472         call = stubws->calls;
473         while (call != NULL && call->msgid != msgid)
474                 call = call->next;
475
476         return call;
477 }
478
479 /* search the event */
480 static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name)
481 {
482         struct client_event *ev;
483
484         ev = stubws->events;
485         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
486                 ev = ev->next;
487
488         return ev;
489 }
490
491
492 /* allocates and init the memorizing call */
493 static struct client_call *client_call_make(struct afb_stub_ws *stubws, struct afb_xreq *xreq)
494 {
495         struct client_call *call;
496
497         call = malloc(sizeof *call);
498         if (call != NULL) {
499                 afb_xreq_addref(xreq);
500                 call->xreq = xreq;
501                 call->msgid = ptr2id(call);
502                 while(client_call_search(stubws, call->msgid) != NULL)
503                         call->msgid++;
504                 call->stubws = stubws;
505                 call->next = stubws->calls;
506                 stubws->calls = call;
507         }
508         return call;
509 }
510
511 /* free and release the memorizing call */
512 static void client_call_destroy(struct client_call *call)
513 {
514         struct client_call **prv;
515
516         prv = &call->stubws->calls;
517         while (*prv != NULL) {
518                 if (*prv == call) {
519                         *prv = call->next;
520                         break;
521                 }
522                 prv = &(*prv)->next;
523         }
524
525         afb_xreq_unref(call->xreq);
526         free(call);
527 }
528
529 /* get event data from the message */
530 static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
531 {
532         return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
533 }
534
535 /* get event from the message */
536 static int client_msg_event_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_event **ev)
537 {
538         const char *name;
539         uint32_t eventid;
540
541         /* get event data from the message */
542         if (!client_msg_event_read(rb, &eventid, &name)) {
543                 ERROR("Invalid message");
544                 return 0;
545         }
546
547         /* check conflicts */
548         *ev = client_event_search(stubws, eventid, name);
549         if (*ev == NULL) {
550                 ERROR("event %s not found", name);
551                 return 0;
552         }
553
554         return 1;
555 }
556
557 /* get event from the message */
558 static int client_msg_call_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call)
559 {
560         uint32_t msgid;
561
562         /* get event data from the message */
563         if (!readbuf_uint32(rb, &msgid)) {
564                 ERROR("Invalid message");
565                 return 0;
566         }
567
568         /* get the call */
569         *call = client_call_search(stubws, msgid);
570         if (*call == NULL) {
571                 ERROR("message not found");
572                 return 0;
573         }
574
575         return 1;
576 }
577
578 /* read a subscrition message */
579 static int client_msg_subscription_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call, struct client_event **ev)
580 {
581         return client_msg_call_get(stubws, rb, call) && client_msg_event_get(stubws, rb, ev);
582 }
583
584 /* adds an event */
585 static void client_event_create(struct afb_stub_ws *stubws, struct readbuf *rb)
586 {
587         size_t offset;
588         const char *name;
589         uint32_t eventid;
590         struct client_event *ev;
591
592         /* get event data from the message */
593         offset = client_msg_event_read(rb, &eventid, &name);
594         if (offset == 0) {
595                 ERROR("Invalid message");
596                 return;
597         }
598
599         /* check conflicts */
600         ev = client_event_search(stubws, eventid, name);
601         if (ev != NULL) {
602                 ev->refcount++;
603                 return;
604         }
605
606         /* no conflict, try to add it */
607         ev = malloc(sizeof *ev);
608         if (ev != NULL) {
609                 ev->event = afb_evt_create_event(name);
610                 if (ev->event.closure == NULL)
611                         free(ev);
612                 else {
613                         ev->refcount = 1;
614                         ev->eventid = eventid;
615                         ev->next = stubws->events;
616                         stubws->events = ev;
617                         return;
618                 }
619         }
620         ERROR("can't create event %s, out of memory", name);
621 }
622
623 /* removes an event */
624 static void client_event_drop(struct afb_stub_ws *stubws, struct readbuf *rb)
625 {
626         struct client_event *ev, **prv;
627
628         /* retrieves the event */
629         if (!client_msg_event_get(stubws, rb, &ev))
630                 return;
631
632         /* decrease the reference count */
633         if (--ev->refcount)
634                 return;
635
636         /* unlinks the event */
637         prv = &stubws->events;
638         while (*prv != ev)
639                 prv = &(*prv)->next;
640         *prv = ev->next;
641
642         /* destroys the event */
643         afb_event_drop(ev->event);
644         free(ev);
645 }
646
647 /* subscribes an event */
648 static void client_event_subscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
649 {
650         struct client_event *ev;
651         struct client_call *call;
652
653         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
654                 /* subscribe the request from the event */
655                 if (afb_xreq_subscribe(call->xreq, ev->event) < 0)
656                         ERROR("can't subscribe: %m");
657         }
658 }
659
660 /* unsubscribes an event */
661 static void client_event_unsubscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
662 {
663         struct client_event *ev;
664         struct client_call *call;
665
666         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
667                 /* unsubscribe the request from the event */
668                 if (afb_xreq_unsubscribe(call->xreq, ev->event) < 0)
669                         ERROR("can't unsubscribe: %m");
670         }
671 }
672
673 /* receives broadcasted events */
674 static void client_event_broadcast(struct afb_stub_ws *stubws, struct readbuf *rb)
675 {
676         struct json_object *object;
677         const char *event;
678
679         if (readbuf_string(rb, &event, NULL) && readbuf_object(rb, &object))
680                 afb_evt_broadcast(event, object);
681         else
682                 ERROR("unreadable broadcasted event");
683 }
684
685 /* pushs an event */
686 static void client_event_push(struct afb_stub_ws *stubws, struct readbuf *rb)
687 {
688         struct client_event *ev;
689         struct json_object *object;
690
691         if (client_msg_event_get(stubws, rb, &ev) && readbuf_object(rb, &object))
692                 afb_event_push(ev->event, object);
693         else
694                 ERROR("unreadable push event");
695 }
696
697 static void client_reply_success(struct afb_stub_ws *stubws, struct readbuf *rb)
698 {
699         struct client_call *call;
700         struct json_object *object;
701         const char *info;
702         uint32_t flags;
703
704         /* retrieve the message data */
705         if (!client_msg_call_get(stubws, rb, &call))
706                 return;
707
708         if (readbuf_uint32(rb, &flags)
709          && readbuf_string(rb, &info, NULL)
710          && readbuf_object(rb, &object)) {
711                 call->xreq->context.flags = (unsigned)flags;
712                 afb_xreq_success(call->xreq, object, *info ? info : NULL);
713         } else {
714                 /* failing to have the answer */
715                 afb_xreq_fail(call->xreq, "error", "ws error");
716         }
717         client_call_destroy(call);
718 }
719
720 static void client_reply_fail(struct afb_stub_ws *stubws, struct readbuf *rb)
721 {
722         struct client_call *call;
723         const char *info, *status;
724         uint32_t flags;
725
726         /* retrieve the message data */
727         if (!client_msg_call_get(stubws, rb, &call))
728                 return;
729
730         if (readbuf_uint32(rb, &flags)
731          && readbuf_string(rb, &status, NULL)
732          && readbuf_string(rb, &info, NULL)) {
733                 call->xreq->context.flags = (unsigned)flags;
734                 afb_xreq_fail(call->xreq, status, *info ? info : NULL);
735         } else {
736                 /* failing to have the answer */
737                 afb_xreq_fail(call->xreq, "error", "ws error");
738         }
739         client_call_destroy(call);
740 }
741
742 /* send a subcall reply */
743 static void client_send_subcall_reply(struct client_subcall *subcall, int status, json_object *object)
744 {
745         int rc;
746         struct writebuf wb = { .count = 0 };
747         char ie = status < 0;
748
749         if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
750          || !writebuf_uint32(&wb, subcall->subcallid)
751          || !writebuf_char(&wb, ie)
752          || !writebuf_object(&wb, object)) {
753                 /* write error ? */
754                 return;
755         }
756
757         rc = afb_ws_binary_v(subcall->stubws->ws, wb.iovec, wb.count);
758         if (rc >= 0)
759                 return;
760         ERROR("error while sending subcall reply");
761 }
762
763 /* callback for subcall reply */
764 static void client_subcall_reply_cb(void *closure, int status, json_object *object)
765 {
766         client_send_subcall_reply(closure, status, object);
767         free(closure);
768 }
769
770 /* received a subcall request */
771 static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb)
772 {
773         struct client_subcall *subcall;
774         struct client_call *call;
775         const char *api, *verb;
776         uint32_t subcallid;
777         struct json_object *object;
778
779         subcall = malloc(sizeof *subcall);
780         if (!subcall)
781                 return;
782
783         /* retrieve the message data */
784         if (!client_msg_call_get(stubws, rb, &call))
785                 return;
786
787         if (readbuf_uint32(rb, &subcallid)
788          && readbuf_string(rb, &api, NULL)
789          && readbuf_string(rb, &verb, NULL)
790          && readbuf_object(rb, &object)) {
791                 subcall->stubws = stubws;
792                 subcall->subcallid = subcallid;
793                 afb_xreq_subcall(call->xreq, api, verb, object, client_subcall_reply_cb, subcall);
794         }
795 }
796
797 /* pushs an event */
798 static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb)
799 {
800         uint32_t descid;
801         struct client_describe *desc;
802         struct json_object *object;
803
804         if (!readbuf_uint32(rb, &descid))
805                 ERROR("unreadable description");
806         else {
807                 desc = stubws->describes;
808                 while (desc && desc->descid != descid)
809                         desc = desc->next;
810                 if (desc == NULL)
811                         ERROR("unexpected description");
812                 else {
813                         if (readbuf_object(rb, &object))
814                                 desc->result = object;
815                         else
816                                 ERROR("bad description");
817                         jobs_leave(desc->jobloop);
818                 }
819         }
820 }
821
822 /* callback when receiving binary data */
823 static void client_on_binary(void *closure, char *data, size_t size)
824 {
825         if (size > 0) {
826                 struct afb_stub_ws *stubws = closure;
827                 struct readbuf rb = { .head = data, .end = data + size };
828
829                 pthread_mutex_lock(&stubws->mutex);
830                 switch (*rb.head++) {
831                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
832                         client_reply_success(stubws, &rb);
833                         break;
834                 case CHAR_FOR_ANSWER_FAIL: /* fail */
835                         client_reply_fail(stubws, &rb);
836                         break;
837                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
838                         client_event_broadcast(stubws, &rb);
839                         break;
840                 case CHAR_FOR_EVT_ADD: /* creates the event */
841                         client_event_create(stubws, &rb);
842                         break;
843                 case CHAR_FOR_EVT_DEL: /* drops the event */
844                         client_event_drop(stubws, &rb);
845                         break;
846                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
847                         client_event_push(stubws, &rb);
848                         break;
849                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
850                         client_event_subscribe(stubws, &rb);
851                         break;
852                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
853                         client_event_unsubscribe(stubws, &rb);
854                         break;
855                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
856                         client_subcall(stubws, &rb);
857                         break;
858                 case CHAR_FOR_DESCRIPTION: /* description */
859                         client_on_description(stubws, &rb);
860                         break;
861                 default: /* unexpected message */
862                         /* TODO: close the connection */
863                         break;
864                 }
865                 pthread_mutex_unlock(&stubws->mutex);
866         }
867         free(data);
868 }
869
870 /* on call, propagate it to the ws service */
871 static void client_call_cb(void * closure, struct afb_xreq *xreq)
872 {
873         int rc;
874         struct client_call *call;
875         struct writebuf wb = { .count = 0 };
876         const char *raw;
877         size_t szraw;
878         struct afb_stub_ws *stubws = closure;
879
880         pthread_mutex_lock(&stubws->mutex);
881
882         /* create the recording data */
883         call = client_call_make(stubws, xreq);
884         if (call == NULL) {
885                 afb_xreq_fail_f(xreq, "error", "out of memory");
886                 goto end;
887         }
888
889         /* creates the call message */
890         raw = afb_xreq_raw(xreq, &szraw);
891         if (raw == NULL)
892                 goto internal_error;
893         if (!writebuf_char(&wb, CHAR_FOR_CALL)
894          || !writebuf_uint32(&wb, call->msgid)
895          || !writebuf_uint32(&wb, (uint32_t)xreq->context.flags)
896          || !writebuf_string(&wb, xreq->verb)
897          || !writebuf_string(&wb, afb_session_uuid(xreq->context.session))
898          || !writebuf_string_length(&wb, raw, szraw))
899                 goto overflow;
900
901         /* send */
902         rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
903         if (rc >= 0)
904                 goto end;
905
906         afb_xreq_fail(xreq, "error", "websocket sending error");
907         goto clean_call;
908
909 internal_error:
910         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
911         goto clean_call;
912
913 overflow:
914         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
915
916 clean_call:
917         client_call_destroy(call);
918 end:
919         pthread_mutex_unlock(&stubws->mutex);
920 }
921
922 static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
923 {
924         struct client_describe *desc = closure;
925         struct writebuf wb = { .count = 0 };
926
927         if (!signum) {
928                 /* record the jobloop */
929                 desc->jobloop = jobloop;
930
931                 /* send */
932                 if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
933                  && writebuf_uint32(&wb, desc->descid)
934                  && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0)
935                         return;
936         }
937         jobs_leave(jobloop);
938 }
939
940 /* get the description */
941 static struct json_object *client_describe_cb(void * closure)
942 {
943         struct client_describe desc, *d;
944         struct afb_stub_ws *stubws = closure;
945
946         /* fill in stack the description of the task */
947         pthread_mutex_lock(&stubws->mutex);
948         desc.result = NULL;
949         desc.descid = ptr2id(&desc);
950         d = stubws->describes;
951         while (d) {
952                 if (d->descid != desc.descid)
953                         d = d->next;
954                 else {
955                         desc.descid++;
956                         d = stubws->describes;
957                 }
958         }
959         desc.stubws = stubws;
960         desc.next = stubws->describes;
961         stubws->describes = &desc;
962         pthread_mutex_unlock(&stubws->mutex);
963
964         /* synchronous job: send the request and wait its result */
965         jobs_enter(NULL, 0, client_send_describe_cb, &desc);
966
967         /* unlink and send the result */
968         pthread_mutex_lock(&stubws->mutex);
969         d = stubws->describes;
970         if (d == &desc)
971                 stubws->describes = desc.next;
972         else {
973                 while (d) {
974                         if (d->next != &desc)
975                                 d = d->next;
976                         else {
977                                 d->next = desc.next;
978                                 d = NULL;
979                         }
980                 }
981         }
982         pthread_mutex_unlock(&stubws->mutex);
983         return desc.result;
984 }
985
986 /******************* client description part for server *****************************/
987
988 /* on call, propagate it to the ws service */
989 static void server_on_call(struct afb_stub_ws *stubws, struct readbuf *rb)
990 {
991         struct server_req *wreq;
992         char *cverb;
993         const char *uuid, *verb;
994         uint32_t flags, msgid;
995         size_t lenverb;
996         struct json_object *object;
997
998         afb_stub_ws_addref(stubws);
999
1000         /* reads the call message data */
1001         if (!readbuf_uint32(rb, &msgid)
1002          || !readbuf_uint32(rb, &flags)
1003          || !readbuf_string(rb, &verb, &lenverb)
1004          || !readbuf_string(rb, &uuid, NULL)
1005          || !readbuf_object(rb, &object))
1006                 goto overflow;
1007
1008         /* create the request */
1009         wreq = malloc(++lenverb + sizeof *wreq);
1010         if (wreq == NULL)
1011                 goto out_of_memory;
1012
1013         afb_xreq_init(&wreq->xreq, &server_req_xreq_itf);
1014         wreq->stubws = stubws;
1015         wreq->msgid = msgid;
1016         cverb = (char*)&wreq[1];
1017         memcpy(cverb, verb, lenverb);
1018
1019         /* init the context */
1020         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
1021                 goto unconnected;
1022         wreq->xreq.context.flags = flags;
1023
1024         /* makes the call */
1025         wreq->xreq.cred = afb_cred_addref(stubws->cred);
1026         wreq->xreq.api = stubws->apiname;
1027         wreq->xreq.verb = cverb;
1028         wreq->xreq.json = object;
1029         afb_xreq_process(&wreq->xreq, stubws->apiset);
1030         return;
1031
1032 unconnected:
1033         free(wreq);
1034 out_of_memory:
1035         json_object_put(object);
1036 overflow:
1037         afb_stub_ws_unref(stubws);
1038 }
1039
1040 /* on subcall reply */
1041 static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb)
1042 {
1043         char ie;
1044         uint32_t subcallid;
1045         struct json_object *object;
1046         struct server_subcall *sc, **psc;
1047
1048         /* reads the call message data */
1049         if (!readbuf_uint32(rb, &subcallid)
1050          || !readbuf_char(rb, &ie)
1051          || !readbuf_object(rb, &object)) {
1052                 /* TODO bad protocol */
1053                 return;
1054         }
1055
1056         /* search the subcall and unlink it */
1057         pthread_mutex_lock(&stubws->mutex);
1058         psc = &stubws->subcalls;
1059         while ((sc = *psc) && sc->subcallid != subcallid)
1060                 psc = &sc->next;
1061         if (!sc) {
1062                 pthread_mutex_unlock(&stubws->mutex);
1063                 /* TODO subcall not found */
1064         } else {
1065                 *psc = sc->next;
1066                 pthread_mutex_unlock(&stubws->mutex);
1067                 sc->callback(sc->closure, -(int)ie, object);
1068                 free(sc);
1069         }
1070         json_object_put(object);
1071 }
1072
1073 static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj)
1074 {
1075         struct writebuf wb = { .count = 0 };
1076
1077         if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
1078          || !writebuf_uint32(&wb, descid)
1079          || !writebuf_object(&wb, descobj)
1080          || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0)
1081                 ERROR("can't send description");
1082 }
1083
1084 static void server_describe_job(int signum, void *closure)
1085 {
1086         struct afb_api api;
1087         struct json_object *obj;
1088         struct server_describe *desc = closure;
1089
1090         /* get the description if possible */
1091         obj = NULL;
1092         if (!signum
1093          && !afb_apiset_get(desc->stubws->apiset, desc->stubws->apiname, &api)
1094          && api.itf->describe) {
1095                 obj = api.itf->describe(api.closure);
1096         }
1097
1098         /* send it */
1099         server_send_description(desc->stubws, desc->descid, obj);
1100         json_object_put(obj);
1101         afb_stub_ws_unref(desc->stubws);
1102         free(desc);
1103 }
1104
1105 /* on describe, propagate it to the ws service */
1106 static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb)
1107 {
1108
1109         uint32_t descid;
1110         struct server_describe *desc;
1111
1112         /* reads the descid */
1113         if (readbuf_uint32(rb, &descid)) {
1114                 /* create asynchronous job */
1115                 desc = malloc(sizeof *desc);
1116                 if (desc) {
1117                         desc->descid = descid;
1118                         desc->stubws = stubws;
1119                         afb_stub_ws_addref(stubws);
1120                         if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
1121                                 server_describe_job(0, desc);
1122                         return;
1123                 }
1124                 server_send_description(stubws, descid, NULL);
1125         }
1126         ERROR("can't provide description");
1127 }
1128
1129 /* callback when receiving binary data */
1130 static void server_on_binary(void *closure, char *data, size_t size)
1131 {
1132         if (size > 0) {
1133                 struct readbuf rb = { .head = data, .end = data + size };
1134                 switch (*rb.head++) {
1135                 case CHAR_FOR_CALL:
1136                         server_on_call(closure, &rb);
1137                         break;
1138                 case CHAR_FOR_SUBCALL_REPLY:
1139                         server_on_subcall_reply(closure, &rb);
1140                         break;
1141                 case CHAR_FOR_DESCRIBE:
1142                         server_on_describe(closure, &rb);
1143                         break;
1144                 default: /* unexpected message */
1145                         /* TODO: close the connection */
1146                         break;
1147                 }
1148         }
1149         free(data);
1150 }
1151
1152 /******************* server part: manage events **********************************/
1153
1154 static void server_event_send(struct afb_stub_ws *stubws, char order, const char *event, int eventid, const char *data)
1155 {
1156         int rc;
1157         struct writebuf wb = { .count = 0 };
1158
1159         if (writebuf_char(&wb, order)
1160          && writebuf_uint32(&wb, eventid)
1161          && writebuf_string(&wb, event)
1162          && (data == NULL || writebuf_string(&wb, data))) {
1163                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
1164                 if (rc >= 0)
1165                         return;
1166         }
1167         ERROR("error while sending %c for event %s", order, event);
1168 }
1169
1170 static void server_event_add(void *closure, const char *event, int eventid)
1171 {
1172         server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
1173 }
1174
1175 static void server_event_remove(void *closure, const char *event, int eventid)
1176 {
1177         server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1178 }
1179
1180 static void server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1181 {
1182         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1183         server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1184         json_object_put(object);
1185 }
1186
1187 static void server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1188 {
1189         int rc;
1190         struct afb_stub_ws *stubws = closure;
1191
1192         struct writebuf wb = { .count = 0 };
1193
1194         if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST) && writebuf_string(&wb, event) && writebuf_object(&wb, object)) {
1195                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
1196                 if (rc < 0)
1197                         ERROR("error while broadcasting event %s", event);
1198         } else
1199                 ERROR("error while broadcasting event %s", event);
1200         json_object_put(object);
1201 }
1202
1203 /*****************************************************/
1204
1205 /* callback when receiving a hangup */
1206 static void server_on_hangup(void *closure)
1207 {
1208         struct afb_stub_ws *stubws = closure;
1209
1210         /* close the socket */
1211         if (stubws->fd >= 0) {
1212                 close(stubws->fd);
1213                 stubws->fd = -1;
1214         }
1215
1216         /* release the client */
1217         afb_stub_ws_unref(stubws);
1218 }
1219
1220 /*****************************************************/
1221
1222 /* the interface for events pushing */
1223 static const struct afb_evt_itf server_evt_itf = {
1224         .broadcast = server_event_broadcast,
1225         .push = server_event_push,
1226         .add = server_event_add,
1227         .remove = server_event_remove
1228 };
1229
1230 static const struct afb_ws_itf stub_ws_client_ws_itf =
1231 {
1232         .on_close = NULL,
1233         .on_text = NULL,
1234         .on_binary = client_on_binary,
1235         .on_error = NULL,
1236         .on_hangup = NULL
1237 };
1238
1239 static const struct afb_ws_itf server_ws_itf =
1240 {
1241         .on_close = NULL,
1242         .on_text = NULL,
1243         .on_binary = server_on_binary,
1244         .on_error = NULL,
1245         .on_hangup = server_on_hangup
1246 };
1247
1248 static struct afb_api_itf ws_api_itf = {
1249         .call = client_call_cb,
1250         .describe = client_describe_cb
1251 };
1252
1253 /*****************************************************/
1254
1255 static struct afb_stub_ws *afb_stub_ws_create(int fd, const char *apiname, struct afb_apiset *apiset, const struct afb_ws_itf *itf)
1256 {
1257         struct afb_stub_ws *stubws;
1258
1259         stubws = calloc(1, sizeof *stubws + strlen(apiname));
1260         if (stubws == NULL)
1261                 errno = ENOMEM;
1262         else {
1263                 fcntl(fd, F_SETFD, FD_CLOEXEC);
1264                 fcntl(fd, F_SETFL, O_NONBLOCK);
1265                 stubws->ws = afb_ws_create(afb_common_get_event_loop(), fd, itf, stubws);
1266                 if (stubws->ws != NULL) {
1267                         stubws->fd = fd;
1268                         strcpy(stubws->apiname, apiname);
1269                         stubws->apiset = afb_apiset_addref(apiset);
1270                         stubws->refcount = 1;
1271                         stubws->subcalls = NULL;
1272                         return stubws;
1273                 }
1274                 free(stubws);
1275         }
1276         return NULL;
1277 }
1278
1279 struct afb_stub_ws *afb_stub_ws_create_client(int fd, const char *apiname, struct afb_apiset *apiset)
1280 {
1281         struct afb_api afb_api;
1282         struct afb_stub_ws *stubws;
1283
1284         stubws = afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf);
1285         if (stubws) {
1286                 afb_api.closure = stubws;
1287                 afb_api.itf = &ws_api_itf;
1288                 if (afb_apiset_add(apiset, stubws->apiname, afb_api) >= 0)
1289                         return stubws;
1290                 afb_stub_ws_unref(stubws);
1291         }
1292         return NULL;
1293
1294 }
1295
1296 struct afb_stub_ws *afb_stub_ws_create_server(int fd, const char *apiname, struct afb_apiset *apiset)
1297 {
1298         struct afb_stub_ws *stubws;
1299
1300         stubws = afb_stub_ws_create(fd, apiname, apiset, &server_ws_itf);
1301         if (stubws) {
1302                 stubws->cred = afb_cred_create_for_socket(fd);
1303                 stubws->listener = afb_evt_listener_create(&server_evt_itf, stubws);
1304                 if (stubws->listener != NULL)
1305                         return stubws;
1306                 afb_stub_ws_unref(stubws);
1307         }
1308         return NULL;
1309 }
1310
1311 void afb_stub_ws_unref(struct afb_stub_ws *stubws)
1312 {
1313         struct server_subcall *sc, *nsc;
1314
1315         if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
1316                 afb_evt_listener_unref(stubws->listener);
1317                 afb_ws_destroy(stubws->ws);
1318                 nsc = stubws->subcalls;
1319                 while (nsc) {
1320                         sc= nsc;
1321                         nsc = sc->next;
1322                         sc->callback(sc->closure, 1, NULL);
1323                         free(sc);
1324                 }
1325                 afb_cred_unref(stubws->cred);
1326                 afb_apiset_unref(stubws->apiset);
1327                 free(stubws);
1328         }
1329 }
1330
1331 void afb_stub_ws_addref(struct afb_stub_ws *stubws)
1332 {
1333         __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
1334 }
1335