cleanup and improvement of xreq
[src/app-framework-binder.git] / src / afb-stub-ws.c
1 /*
2  * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #define NO_PLUGIN_VERBOSE_MACRO
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <errno.h>
28 #include <endian.h>
29 #include <netdb.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <pthread.h>
34
35 #include <json-c/json.h>
36 #include <systemd/sd-event.h>
37
38 #include <afb/afb-req-itf.h>
39
40 #include "afb-common.h"
41
42 #include "afb-session.h"
43 #include "afb-cred.h"
44 #include "afb-ws.h"
45 #include "afb-msg-json.h"
46 #include "afb-api.h"
47 #include "afb-apiset.h"
48 #include "afb-stub-ws.h"
49 #include "afb-context.h"
50 #include "afb-evt.h"
51 #include "afb-xreq.h"
52 #include "verbose.h"
53
54 struct client_call;
55 struct client_event;
56 struct afb_stub_ws;
57
58 /************** constants for protocol definition *************************/
59
60 #define CHAR_FOR_CALL             'C'
61 #define CHAR_FOR_ANSWER_SUCCESS   'T'
62 #define CHAR_FOR_ANSWER_FAIL      'F'
63 #define CHAR_FOR_EVT_BROADCAST    '*'
64 #define CHAR_FOR_EVT_ADD          '+'
65 #define CHAR_FOR_EVT_DEL          '-'
66 #define CHAR_FOR_EVT_PUSH         '!'
67 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
68 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
69 #define CHAR_FOR_SUBCALL_CALL     'B'
70 #define CHAR_FOR_SUBCALL_REPLY    'R'
71
72 /******************* handling subcalls *****************************/
73
74 /**
75  * Structure on server side for recording pending
76  * subcalls.
77  */
78 struct server_subcall
79 {
80         struct server_subcall *next;    /**< next subcall for the client */
81         uint32_t subcallid;             /**< the subcallid */
82         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
83         void *closure;                  /**< closure of the callback */
84 };
85
86 /**
87  * Structure for sending back replies on client side
88  */
89 struct client_subcall
90 {
91         struct afb_stub_ws *stubws;     /**< stub descriptor */
92         uint32_t subcallid;             /**< subcallid for the reply */
93 };
94
95 /*
96  * structure for recording calls on client side
97  */
98 struct client_call {
99         struct client_call *next;       /* the next call */
100         struct afb_stub_ws *stubws;     /* the stub_ws */
101         struct afb_xreq *xreq;          /* the request handle */
102         uint32_t msgid;                 /* the message identifier */
103 };
104
105 /*
106  * structure for a ws request
107  */
108 struct server_req {
109         struct afb_xreq xreq;           /* the xreq */
110         struct afb_stub_ws *stubws;     /* the client of the request */
111         uint32_t msgid;                 /* the incoming request msgid */
112 };
113
114 /*
115  * structure for recording events on client side
116  */
117 struct client_event
118 {
119         struct client_event *next;
120         struct afb_event event;
121         int eventid;
122         int refcount;
123 };
124
125 /******************* client description part for server *****************************/
126
127 struct afb_stub_ws
128 {
129         /* count of references */
130         int refcount;
131
132         /* file descriptor */
133         int fd;
134
135         /* resource control */
136         pthread_mutex_t mutex;
137
138         /* websocket */
139         struct afb_ws *ws;
140
141         /* listener for events (server side) */
142         struct afb_evt_listener *listener;
143
144         /* event replica (client side) */
145         struct client_event *events;
146
147         /* emitted calls (client side) */
148         struct client_call *calls;
149
150         /* credentials (server side) */
151         struct afb_cred *cred;
152
153         /* pending subcalls (server side) */
154         struct server_subcall *subcalls;
155
156         /* apiset */
157         struct afb_apiset *apiset;
158
159         /* the api name */
160         char apiname[1];
161 };
162
163 /******************* common useful tools **********************************/
164
165 /**
166  * translate a pointer to some integer
167  * @param ptr the pointer to translate
168  * @return an integer
169  */
170 static inline uint32_t ptr2id(void *ptr)
171 {
172         return (uint32_t)(((intptr_t)ptr) >> 6);
173 }
174
175 /******************* serialisation part **********************************/
176
177 struct readbuf
178 {
179         char *head, *end;
180 };
181
182 #define WRITEBUF_COUNT_MAX  32
183 struct writebuf
184 {
185         struct iovec iovec[WRITEBUF_COUNT_MAX];
186         uint32_t uints[WRITEBUF_COUNT_MAX];
187         int count;
188 };
189
190 static char *readbuf_get(struct readbuf *rb, uint32_t length)
191 {
192         char *before = rb->head;
193         char *after = before + length;
194         if (after > rb->end)
195                 return 0;
196         rb->head = after;
197         return before;
198 }
199
200 static int readbuf_char(struct readbuf *rb, char *value)
201 {
202         if (rb->head >= rb->end)
203                 return 0;
204         *value = *rb->head++;
205         return 1;
206 }
207
208 static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
209 {
210         char *after = rb->head + sizeof *value;
211         if (after > rb->end)
212                 return 0;
213         memcpy(value, rb->head, sizeof *value);
214         rb->head = after;
215         *value = le32toh(*value);
216         return 1;
217 }
218
219 static int readbuf_string(struct readbuf *rb, const char **value, size_t *length)
220 {
221         uint32_t len;
222         if (!readbuf_uint32(rb, &len) || !len)
223                 return 0;
224         if (length)
225                 *length = (size_t)(len - 1);
226         return (*value = readbuf_get(rb, len)) != NULL &&  rb->head[-1] == 0;
227 }
228
229 static int readbuf_object(struct readbuf *rb, struct json_object **object)
230 {
231         const char *string;
232         struct json_object *o;
233         int rc = readbuf_string(rb, &string, NULL);
234         if (rc) {
235                 o = json_tokener_parse(string);
236                 if (o == NULL && strcmp(string, "null"))
237                         o = json_object_new_string(string);
238                 *object = o;
239         }
240         return rc;
241 }
242
243 static int writebuf_put(struct writebuf *wb, const void *value, size_t length)
244 {
245         int i = wb->count;
246         if (i == WRITEBUF_COUNT_MAX)
247                 return 0;
248         wb->iovec[i].iov_base = (void*)value;
249         wb->iovec[i].iov_len = length;
250         wb->count = i + 1;
251         return 1;
252 }
253
254 static int writebuf_char(struct writebuf *wb, char value)
255 {
256         int i = wb->count;
257         if (i == WRITEBUF_COUNT_MAX)
258                 return 0;
259         *(char*)&wb->uints[i] = value;
260         wb->iovec[i].iov_base = &wb->uints[i];
261         wb->iovec[i].iov_len = 1;
262         wb->count = i + 1;
263         return 1;
264 }
265
266 static int writebuf_uint32(struct writebuf *wb, uint32_t value)
267 {
268         int i = wb->count;
269         if (i == WRITEBUF_COUNT_MAX)
270                 return 0;
271         wb->uints[i] = htole32(value);
272         wb->iovec[i].iov_base = &wb->uints[i];
273         wb->iovec[i].iov_len = sizeof wb->uints[i];
274         wb->count = i + 1;
275         return 1;
276 }
277
278 static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
279 {
280         uint32_t len = (uint32_t)++length;
281         return (size_t)len == length && len && writebuf_uint32(wb, len) && writebuf_put(wb, value, length);
282 }
283
284 static int writebuf_string(struct writebuf *wb, const char *value)
285 {
286         return writebuf_string_length(wb, value, strlen(value));
287 }
288
289 static int writebuf_object(struct writebuf *wb, struct json_object *object)
290 {
291         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
292         return string != NULL && writebuf_string(wb, string);
293 }
294
295 /******************* ws request part for server *****************/
296
297 /* decrement the reference count of the request and free/release it on falling to null */
298 static void server_req_destroy_cb(struct afb_xreq *xreq)
299 {
300         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
301
302         afb_context_disconnect(&wreq->xreq.context);
303         afb_cred_unref(wreq->xreq.cred);
304         json_object_put(wreq->xreq.json);
305         afb_stub_ws_unref(wreq->stubws);
306         free(wreq);
307 }
308
309 static void server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
310 {
311         int rc;
312         struct writebuf wb = { .count = 0 };
313         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
314
315         if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
316          && writebuf_uint32(&wb, wreq->msgid)
317          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
318          && writebuf_string(&wb, info ? : "")
319          && writebuf_object(&wb, obj)) {
320                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
321                 if (rc >= 0)
322                         goto success;
323         }
324         ERROR("error while sending success");
325 success:
326         json_object_put(obj);
327 }
328
329 static void server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
330 {
331         int rc;
332         struct writebuf wb = { .count = 0 };
333         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
334
335         if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
336          && writebuf_uint32(&wb, wreq->msgid)
337          && writebuf_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
338          && writebuf_string(&wb, status)
339          && writebuf_string(&wb, info ? : "")) {
340                 rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
341                 if (rc >= 0)
342                         return;
343         }
344         ERROR("error while sending fail");
345 }
346
347 static void server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
348 {
349         int rc;
350         struct writebuf wb = { .count = 0 };
351         struct server_subcall *sc, *osc;
352         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
353         struct afb_stub_ws *stubws = wreq->stubws;
354
355         sc = malloc(sizeof *sc);
356         if (!sc) {
357
358         } else {
359                 sc->callback = callback;
360                 sc->closure = cb_closure;
361
362                 pthread_mutex_unlock(&stubws->mutex);
363                 sc->subcallid = ptr2id(sc);
364                 do {
365                         sc->subcallid++;
366                         osc = stubws->subcalls;
367                         while(osc && osc->subcallid != sc->subcallid)
368                                 osc = osc->next;
369                 } while (osc);
370                 sc->next = stubws->subcalls;
371                 stubws->subcalls = sc;
372                 pthread_mutex_unlock(&stubws->mutex);
373
374                 if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL)
375                  && writebuf_uint32(&wb, wreq->msgid)
376                  && writebuf_uint32(&wb, sc->subcallid)
377                  && writebuf_string(&wb, api)
378                  && writebuf_string(&wb, verb)
379                  && writebuf_object(&wb, args)) {
380                         rc = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
381                         if (rc >= 0)
382                                 return;
383                 }
384                 ERROR("error while sending fail");
385         }
386 }
387
388 static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
389 {
390         int rc, rc2;
391         struct writebuf wb = { .count = 0 };
392         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
393
394         rc = afb_evt_add_watch(wreq->stubws->listener, event);
395         if (rc < 0)
396                 return rc;
397
398         if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
399          && writebuf_uint32(&wb, wreq->msgid)
400          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
401          && writebuf_string(&wb, afb_evt_event_name(event))) {
402                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
403                 if (rc2 >= 0)
404                         goto success;
405         }
406         ERROR("error while subscribing event");
407 success:
408         return rc;
409 }
410
411 static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
412 {
413         int rc, rc2;
414         struct writebuf wb = { .count = 0 };
415         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
416
417         if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
418          && writebuf_uint32(&wb, wreq->msgid)
419          && writebuf_uint32(&wb, (uint32_t)afb_evt_event_id(event))
420          && writebuf_string(&wb, afb_evt_event_name(event))) {
421                 rc2 = afb_ws_binary_v(wreq->stubws->ws, wb.iovec, wb.count);
422                 if (rc2 >= 0)
423                         goto success;
424         }
425         ERROR("error while subscribing event");
426 success:
427         rc = afb_evt_remove_watch(wreq->stubws->listener, event);
428         return rc;
429 }
430
431 static const struct afb_xreq_query_itf server_req_xreq_itf = {
432         .success = server_req_success_cb,
433         .fail = server_req_fail_cb,
434         .unref = server_req_destroy_cb,
435         .subcall = server_req_subcall_cb,
436         .subscribe = server_req_subscribe_cb,
437         .unsubscribe = server_req_unsubscribe_cb
438 };
439
440 /******************* client part **********************************/
441
442 /* search a memorized call */
443 static struct client_call *client_call_search(struct afb_stub_ws *stubws, uint32_t msgid)
444 {
445         struct client_call *call;
446
447         call = stubws->calls;
448         while (call != NULL && call->msgid != msgid)
449                 call = call->next;
450
451         return call;
452 }
453
454 /* search the event */
455 static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name)
456 {
457         struct client_event *ev;
458
459         ev = stubws->events;
460         while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
461                 ev = ev->next;
462
463         return ev;
464 }
465
466
467 /* allocates and init the memorizing call */
468 static struct client_call *client_call_make(struct afb_stub_ws *stubws, struct afb_xreq *xreq)
469 {
470         struct client_call *call;
471
472         call = malloc(sizeof *call);
473         if (call != NULL) {
474                 afb_xreq_addref(xreq);
475                 call->xreq = xreq;
476                 call->msgid = ptr2id(call);
477                 while(client_call_search(stubws, call->msgid) != NULL)
478                         call->msgid++;
479                 call->stubws = stubws;
480                 call->next = stubws->calls;
481                 stubws->calls = call;
482         }
483         return call;
484 }
485
486 /* free and release the memorizing call */
487 static void client_call_destroy(struct client_call *call)
488 {
489         struct client_call **prv;
490
491         prv = &call->stubws->calls;
492         while (*prv != NULL) {
493                 if (*prv == call) {
494                         *prv = call->next;
495                         break;
496                 }
497                 prv = &(*prv)->next;
498         }
499
500         afb_xreq_unref(call->xreq);
501         free(call);
502 }
503
504 /* get event data from the message */
505 static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
506 {
507         return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
508 }
509
510 /* get event from the message */
511 static int client_msg_event_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_event **ev)
512 {
513         const char *name;
514         uint32_t eventid;
515
516         /* get event data from the message */
517         if (!client_msg_event_read(rb, &eventid, &name)) {
518                 ERROR("Invalid message");
519                 return 0;
520         }
521
522         /* check conflicts */
523         *ev = client_event_search(stubws, eventid, name);
524         if (*ev == NULL) {
525                 ERROR("event %s not found", name);
526                 return 0;
527         }
528
529         return 1;
530 }
531
532 /* get event from the message */
533 static int client_msg_call_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call)
534 {
535         uint32_t msgid;
536
537         /* get event data from the message */
538         if (!readbuf_uint32(rb, &msgid)) {
539                 ERROR("Invalid message");
540                 return 0;
541         }
542
543         /* get the call */
544         *call = client_call_search(stubws, msgid);
545         if (*call == NULL) {
546                 ERROR("message not found");
547                 return 0;
548         }
549
550         return 1;
551 }
552
553 /* read a subscrition message */
554 static int client_msg_subscription_get(struct afb_stub_ws *stubws, struct readbuf *rb, struct client_call **call, struct client_event **ev)
555 {
556         return client_msg_call_get(stubws, rb, call) && client_msg_event_get(stubws, rb, ev);
557 }
558
559 /* adds an event */
560 static void client_event_create(struct afb_stub_ws *stubws, struct readbuf *rb)
561 {
562         size_t offset;
563         const char *name;
564         uint32_t eventid;
565         struct client_event *ev;
566
567         /* get event data from the message */
568         offset = client_msg_event_read(rb, &eventid, &name);
569         if (offset == 0) {
570                 ERROR("Invalid message");
571                 return;
572         }
573
574         /* check conflicts */
575         ev = client_event_search(stubws, eventid, name);
576         if (ev != NULL) {
577                 ev->refcount++;
578                 return;
579         }
580
581         /* no conflict, try to add it */
582         ev = malloc(sizeof *ev);
583         if (ev != NULL) {
584                 ev->event = afb_evt_create_event(name);
585                 if (ev->event.closure == NULL)
586                         free(ev);
587                 else {
588                         ev->refcount = 1;
589                         ev->eventid = eventid;
590                         ev->next = stubws->events;
591                         stubws->events = ev;
592                         return;
593                 }
594         }
595         ERROR("can't create event %s, out of memory", name);
596 }
597
598 /* removes an event */
599 static void client_event_drop(struct afb_stub_ws *stubws, struct readbuf *rb)
600 {
601         struct client_event *ev, **prv;
602
603         /* retrieves the event */
604         if (!client_msg_event_get(stubws, rb, &ev))
605                 return;
606
607         /* decrease the reference count */
608         if (--ev->refcount)
609                 return;
610
611         /* unlinks the event */
612         prv = &stubws->events;
613         while (*prv != ev)
614                 prv = &(*prv)->next;
615         *prv = ev->next;
616
617         /* destroys the event */
618         afb_event_drop(ev->event);
619         free(ev);
620 }
621
622 /* subscribes an event */
623 static void client_event_subscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
624 {
625         struct client_event *ev;
626         struct client_call *call;
627
628         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
629                 /* subscribe the request from the event */
630                 if (afb_xreq_subscribe(call->xreq, ev->event) < 0)
631                         ERROR("can't subscribe: %m");
632         }
633 }
634
635 /* unsubscribes an event */
636 static void client_event_unsubscribe(struct afb_stub_ws *stubws, struct readbuf *rb)
637 {
638         struct client_event *ev;
639         struct client_call *call;
640
641         if (client_msg_subscription_get(stubws, rb, &call, &ev)) {
642                 /* unsubscribe the request from the event */
643                 if (afb_xreq_unsubscribe(call->xreq, ev->event) < 0)
644                         ERROR("can't unsubscribe: %m");
645         }
646 }
647
648 /* receives broadcasted events */
649 static void client_event_broadcast(struct afb_stub_ws *stubws, struct readbuf *rb)
650 {
651         struct json_object *object;
652         const char *event;
653
654         if (readbuf_string(rb, &event, NULL) && readbuf_object(rb, &object))
655                 afb_evt_broadcast(event, object);
656         else
657                 ERROR("unreadable broadcasted event");
658 }
659
660 /* pushs an event */
661 static void client_event_push(struct afb_stub_ws *stubws, struct readbuf *rb)
662 {
663         struct client_event *ev;
664         struct json_object *object;
665
666         if (client_msg_event_get(stubws, rb, &ev) && readbuf_object(rb, &object))
667                 afb_event_push(ev->event, object);
668         else
669                 ERROR("unreadable push event");
670 }
671
672 static void client_reply_success(struct afb_stub_ws *stubws, struct readbuf *rb)
673 {
674         struct client_call *call;
675         struct json_object *object;
676         const char *info;
677         uint32_t flags;
678
679         /* retrieve the message data */
680         if (!client_msg_call_get(stubws, rb, &call))
681                 return;
682
683         if (readbuf_uint32(rb, &flags)
684          && readbuf_string(rb, &info, NULL)
685          && readbuf_object(rb, &object)) {
686                 call->xreq->context.flags = (unsigned)flags;
687                 afb_xreq_success(call->xreq, object, *info ? info : NULL);
688         } else {
689                 /* failing to have the answer */
690                 afb_xreq_fail(call->xreq, "error", "ws error");
691         }
692         client_call_destroy(call);
693 }
694
695 static void client_reply_fail(struct afb_stub_ws *stubws, struct readbuf *rb)
696 {
697         struct client_call *call;
698         const char *info, *status;
699         uint32_t flags;
700
701         /* retrieve the message data */
702         if (!client_msg_call_get(stubws, rb, &call))
703                 return;
704
705         if (readbuf_uint32(rb, &flags)
706          && readbuf_string(rb, &status, NULL)
707          && readbuf_string(rb, &info, NULL)) {
708                 call->xreq->context.flags = (unsigned)flags;
709                 afb_xreq_fail(call->xreq, status, *info ? info : NULL);
710         } else {
711                 /* failing to have the answer */
712                 afb_xreq_fail(call->xreq, "error", "ws error");
713         }
714         client_call_destroy(call);
715 }
716
717 /* send a subcall reply */
718 static void client_send_subcall_reply(struct client_subcall *subcall, int iserror, json_object *object)
719 {
720         int rc;
721         struct writebuf wb = { .count = 0 };
722         char ie = (char)!!iserror;
723
724         if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
725          || !writebuf_uint32(&wb, subcall->subcallid)
726          || !writebuf_char(&wb, ie)
727          || !writebuf_object(&wb, object)) {
728                 /* write error ? */
729                 return;
730         }
731
732         rc = afb_ws_binary_v(subcall->stubws->ws, wb.iovec, wb.count);
733         if (rc >= 0)
734                 return;
735         ERROR("error while sending subcall reply");
736 }
737
738 /* callback for subcall reply */
739 static void client_subcall_reply_cb(void *closure, int iserror, json_object *object)
740 {
741         client_send_subcall_reply(closure, iserror, object);
742         free(closure);
743 }
744
745 /* received a subcall request */
746 static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb)
747 {
748         struct client_subcall *subcall;
749         struct client_call *call;
750         const char *api, *verb;
751         uint32_t subcallid;
752         struct json_object *object;
753
754         subcall = malloc(sizeof *subcall);
755         if (!subcall)
756                 return;
757
758         /* retrieve the message data */
759         if (!client_msg_call_get(stubws, rb, &call))
760                 return;
761
762         if (readbuf_uint32(rb, &subcallid)
763          && readbuf_string(rb, &api, NULL)
764          && readbuf_string(rb, &verb, NULL)
765          && readbuf_object(rb, &object)) {
766                 subcall->stubws = stubws;
767                 subcall->subcallid = subcallid;
768                 afb_xreq_subcall(call->xreq, api, verb, object, client_subcall_reply_cb, subcall);
769         }
770 }
771
772 /* callback when receiving binary data */
773 static void client_on_binary(void *closure, char *data, size_t size)
774 {
775         if (size > 0) {
776                 struct afb_stub_ws *stubws = closure;
777                 struct readbuf rb = { .head = data, .end = data + size };
778
779                 pthread_mutex_lock(&stubws->mutex);
780                 switch (*rb.head++) {
781                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
782                         client_reply_success(stubws, &rb);
783                         break;
784                 case CHAR_FOR_ANSWER_FAIL: /* fail */
785                         client_reply_fail(stubws, &rb);
786                         break;
787                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
788                         client_event_broadcast(stubws, &rb);
789                         break;
790                 case CHAR_FOR_EVT_ADD: /* creates the event */
791                         client_event_create(stubws, &rb);
792                         break;
793                 case CHAR_FOR_EVT_DEL: /* drops the event */
794                         client_event_drop(stubws, &rb);
795                         break;
796                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
797                         client_event_push(stubws, &rb);
798                         break;
799                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
800                         client_event_subscribe(stubws, &rb);
801                         break;
802                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
803                         client_event_unsubscribe(stubws, &rb);
804                         break;
805                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
806                         client_subcall(stubws, &rb);
807                         break;
808                 default: /* unexpected message */
809                         /* TODO: close the connection */
810                         break;
811                 }
812                 pthread_mutex_unlock(&stubws->mutex);
813         }
814         free(data);
815 }
816
817 /* on call, propagate it to the ws service */
818 static void client_call_cb(void * closure, struct afb_xreq *xreq)
819 {
820         int rc;
821         struct client_call *call;
822         struct writebuf wb = { .count = 0 };
823         const char *raw;
824         size_t szraw;
825         struct afb_stub_ws *stubws = closure;
826
827         pthread_mutex_lock(&stubws->mutex);
828
829         /* create the recording data */
830         call = client_call_make(stubws, xreq);
831         if (call == NULL) {
832                 afb_xreq_fail_f(xreq, "error", "out of memory");
833                 goto end;
834         }
835
836         /* creates the call message */
837         raw = afb_xreq_raw(xreq, &szraw);
838         if (raw == NULL)
839                 goto internal_error;
840         if (!writebuf_char(&wb, CHAR_FOR_CALL)
841          || !writebuf_uint32(&wb, call->msgid)
842          || !writebuf_uint32(&wb, (uint32_t)xreq->context.flags)
843          || !writebuf_string(&wb, xreq->verb)
844          || !writebuf_string(&wb, afb_session_uuid(xreq->context.session))
845          || !writebuf_string_length(&wb, raw, szraw))
846                 goto overflow;
847
848         /* send */
849         rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
850         if (rc >= 0)
851                 goto end;
852
853         afb_xreq_fail(xreq, "error", "websocket sending error");
854         goto clean_call;
855
856 internal_error:
857         afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
858         goto clean_call;
859
860 overflow:
861         afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
862
863 clean_call:
864         client_call_destroy(call);
865 end:
866         pthread_mutex_unlock(&stubws->mutex);
867 }
868
869 /******************* client description part for server *****************************/
870
871 /* on call, propagate it to the ws service */
872 static void server_on_call(struct afb_stub_ws *stubws, struct readbuf *rb)
873 {
874         struct server_req *wreq;
875         char *cverb;
876         const char *uuid, *verb;
877         uint32_t flags, msgid;
878         size_t lenverb;
879         struct json_object *object;
880
881         afb_stub_ws_addref(stubws);
882
883         /* reads the call message data */
884         if (!readbuf_uint32(rb, &msgid)
885          || !readbuf_uint32(rb, &flags)
886          || !readbuf_string(rb, &verb, &lenverb)
887          || !readbuf_string(rb, &uuid, NULL)
888          || !readbuf_object(rb, &object))
889                 goto overflow;
890
891         /* create the request */
892         wreq = malloc(++lenverb + sizeof *wreq);
893         if (wreq == NULL)
894                 goto out_of_memory;
895
896         afb_xreq_init(&wreq->xreq, &server_req_xreq_itf);
897         wreq->stubws = stubws;
898         wreq->msgid = msgid;
899         cverb = (char*)&wreq[1];
900         memcpy(cverb, verb, lenverb);
901
902         /* init the context */
903         if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
904                 goto unconnected;
905         wreq->xreq.context.flags = flags;
906
907         /* makes the call */
908         wreq->xreq.cred = afb_cred_addref(stubws->cred);
909         wreq->xreq.api = stubws->apiname;
910         wreq->xreq.verb = cverb;
911         wreq->xreq.json = object;
912         afb_xreq_process(&wreq->xreq, stubws->apiset);
913         return;
914
915 unconnected:
916         free(wreq);
917 out_of_memory:
918         json_object_put(object);
919 overflow:
920         afb_stub_ws_unref(stubws);
921 }
922
923 /* on subcall reply */
924 static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb)
925 {
926         char iserror;
927         uint32_t subcallid;
928         struct json_object *object;
929         struct server_subcall *sc, **psc;
930
931         /* reads the call message data */
932         if (!readbuf_uint32(rb, &subcallid)
933          || !readbuf_char(rb, &iserror)
934          || !readbuf_object(rb, &object)) {
935                 /* TODO bad protocol */
936                 return;
937         }
938
939         /* search the subcall and unlink it */
940         pthread_mutex_lock(&stubws->mutex);
941         psc = &stubws->subcalls;
942         while ((sc = *psc) && sc->subcallid != subcallid)
943                 psc = &sc->next;
944         if (!sc) {
945                 pthread_mutex_unlock(&stubws->mutex);
946                 /* TODO subcall not found */
947         } else {
948                 *psc = sc->next;
949                 pthread_mutex_unlock(&stubws->mutex);
950                 sc->callback(sc->closure, (int)iserror, object);
951                 free(sc);
952         }
953         json_object_put(object);
954 }
955
956 /* callback when receiving binary data */
957 static void server_on_binary(void *closure, char *data, size_t size)
958 {
959         if (size > 0) {
960                 struct readbuf rb = { .head = data, .end = data + size };
961                 switch (*rb.head++) {
962                 case CHAR_FOR_CALL:
963                         server_on_call(closure, &rb);
964                         break;
965                 case CHAR_FOR_SUBCALL_REPLY:
966                         server_on_subcall_reply(closure, &rb);
967                         break;
968                 default: /* unexpected message */
969                         /* TODO: close the connection */
970                         break;
971                 }
972         }
973         free(data);
974 }
975
976 /******************* server part: manage events **********************************/
977
978 static void server_event_send(struct afb_stub_ws *stubws, char order, const char *event, int eventid, const char *data)
979 {
980         int rc;
981         struct writebuf wb = { .count = 0 };
982
983         if (writebuf_char(&wb, order)
984          && writebuf_uint32(&wb, eventid)
985          && writebuf_string(&wb, event)
986          && (data == NULL || writebuf_string(&wb, data))) {
987                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
988                 if (rc >= 0)
989                         return;
990         }
991         ERROR("error while sending %c for event %s", order, event);
992 }
993
994 static void server_event_add(void *closure, const char *event, int eventid)
995 {
996         server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
997 }
998
999 static void server_event_remove(void *closure, const char *event, int eventid)
1000 {
1001         server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
1002 }
1003
1004 static void server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
1005 {
1006         const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
1007         server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
1008         json_object_put(object);
1009 }
1010
1011 static void server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
1012 {
1013         int rc;
1014         struct afb_stub_ws *stubws = closure;
1015
1016         struct writebuf wb = { .count = 0 };
1017
1018         if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST) && writebuf_string(&wb, event) && writebuf_object(&wb, object)) {
1019                 rc = afb_ws_binary_v(stubws->ws, wb.iovec, wb.count);
1020                 if (rc < 0)
1021                         ERROR("error while broadcasting event %s", event);
1022         } else
1023                 ERROR("error while broadcasting event %s", event);
1024         json_object_put(object);
1025 }
1026
1027 /*****************************************************/
1028
1029 /* callback when receiving a hangup */
1030 static void server_on_hangup(void *closure)
1031 {
1032         struct afb_stub_ws *stubws = closure;
1033
1034         /* close the socket */
1035         if (stubws->fd >= 0) {
1036                 close(stubws->fd);
1037                 stubws->fd = -1;
1038         }
1039
1040         /* release the client */
1041         afb_stub_ws_unref(stubws);
1042 }
1043
1044 /*****************************************************/
1045
1046 /* the interface for events pushing */
1047 static const struct afb_evt_itf server_evt_itf = {
1048         .broadcast = server_event_broadcast,
1049         .push = server_event_push,
1050         .add = server_event_add,
1051         .remove = server_event_remove
1052 };
1053
1054 static const struct afb_ws_itf stub_ws_client_ws_itf =
1055 {
1056         .on_close = NULL,
1057         .on_text = NULL,
1058         .on_binary = client_on_binary,
1059         .on_error = NULL,
1060         .on_hangup = NULL
1061 };
1062
1063 static const struct afb_ws_itf server_ws_itf =
1064 {
1065         .on_close = NULL,
1066         .on_text = NULL,
1067         .on_binary = server_on_binary,
1068         .on_error = NULL,
1069         .on_hangup = server_on_hangup
1070 };
1071
1072 static struct afb_api_itf ws_api_itf = {
1073         .call = client_call_cb
1074 };
1075
1076 /*****************************************************/
1077
1078 static struct afb_stub_ws *afb_stub_ws_create(int fd, const char *apiname, struct afb_apiset *apiset, const struct afb_ws_itf *itf)
1079 {
1080         struct afb_stub_ws *stubws;
1081
1082         stubws = calloc(1, sizeof *stubws + strlen(apiname));
1083         if (stubws == NULL)
1084                 errno = ENOMEM;
1085         else {
1086                 fcntl(fd, F_SETFD, FD_CLOEXEC);
1087                 fcntl(fd, F_SETFL, O_NONBLOCK);
1088                 stubws->ws = afb_ws_create(afb_common_get_event_loop(), fd, itf, stubws);
1089                 if (stubws->ws != NULL) {
1090                         stubws->fd = fd;
1091                         strcpy(stubws->apiname, apiname);
1092                         stubws->apiset = afb_apiset_addref(apiset);
1093                         stubws->refcount = 1;
1094                         stubws->subcalls = NULL;
1095                         return stubws;
1096                 }
1097                 free(stubws);
1098         }
1099         return NULL;
1100 }
1101
1102 struct afb_stub_ws *afb_stub_ws_create_client(int fd, const char *apiname, struct afb_apiset *apiset)
1103 {
1104         struct afb_api afb_api;
1105         struct afb_stub_ws *stubws;
1106
1107         stubws = afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf);
1108         if (stubws) {
1109                 afb_api.closure = stubws;
1110                 afb_api.itf = &ws_api_itf;
1111                 if (afb_apiset_add(apiset, stubws->apiname, afb_api) >= 0)
1112                         return stubws;
1113                 afb_stub_ws_unref(stubws);
1114         }
1115         return NULL;
1116
1117 }
1118
1119 struct afb_stub_ws *afb_stub_ws_create_server(int fd, const char *apiname, struct afb_apiset *apiset)
1120 {
1121         struct afb_stub_ws *stubws;
1122
1123         stubws = afb_stub_ws_create(fd, apiname, apiset, &server_ws_itf);
1124         if (stubws) {
1125                 stubws->cred = afb_cred_create_for_socket(fd);
1126                 stubws->listener = afb_evt_listener_create(&server_evt_itf, stubws);
1127                 if (stubws->listener != NULL)
1128                         return stubws;
1129                 afb_stub_ws_unref(stubws);
1130         }
1131         return NULL;
1132 }
1133
1134 void afb_stub_ws_unref(struct afb_stub_ws *stubws)
1135 {
1136         struct server_subcall *sc, *nsc;
1137
1138         if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
1139                 afb_evt_listener_unref(stubws->listener);
1140                 afb_ws_destroy(stubws->ws);
1141                 nsc = stubws->subcalls;
1142                 while (nsc) {
1143                         sc= nsc;
1144                         nsc = sc->next;
1145                         sc->callback(sc->closure, 1, NULL);
1146                         free(sc);
1147                 }
1148                 afb_cred_unref(stubws->cred);
1149                 afb_apiset_unref(stubws->apiset);
1150                 free(stubws);
1151         }
1152 }
1153
1154 void afb_stub_ws_addref(struct afb_stub_ws *stubws)
1155 {
1156         __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
1157 }
1158