Update date of copyright notices
[src/app-framework-binder.git] / src / afb-proto-ws.c
1 /*
2  * Copyright (C) 2015-2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #define NO_PLUGIN_VERBOSE_MACRO
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <errno.h>
28 #include <endian.h>
29 #include <netdb.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <pthread.h>
34
35 #include <json-c/json.h>
36
37 #include "afb-ws.h"
38 #include "afb-msg-json.h"
39 #include "afb-proto-ws.h"
40 #include "jobs.h"
41 #include "fdev.h"
42
43 struct afb_proto_ws;
44
45 /******** implementation of internal binder protocol per api **************/
46 /*
47
48 This protocol is asymetric: there is a client and a server
49
50 The client can require the following actions:
51
52   - call a verb
53
54   - ask for description
55
56 The server must reply to the previous actions by
57
58   - answering success or failure of the call
59
60   - answering the required description
61
62 The server can also within the context of a call
63
64   - make a subcall
65
66   - subscribe or unsubscribe an event
67
68 For the purpose of handling events the server can:
69
70   - create/destroy an event
71
72   - push or brodcast data as an event
73
74 */
75 /************** constants for protocol definition *************************/
76
77 #define CHAR_FOR_CALL             'C'
78 #define CHAR_FOR_ANSWER_SUCCESS   'T'
79 #define CHAR_FOR_ANSWER_FAIL      'F'
80 #define CHAR_FOR_EVT_BROADCAST    '*'
81 #define CHAR_FOR_EVT_ADD          '+'
82 #define CHAR_FOR_EVT_DEL          '-'
83 #define CHAR_FOR_EVT_PUSH         '!'
84 #define CHAR_FOR_EVT_SUBSCRIBE    'S'
85 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
86 #define CHAR_FOR_SUBCALL_CALL     'B'
87 #define CHAR_FOR_SUBCALL_REPLY    'R'
88 #define CHAR_FOR_DESCRIBE         'D'
89 #define CHAR_FOR_DESCRIPTION      'd'
90
91 /******************* handling subcalls *****************************/
92
93 /**
94  * Structure on server side for recording pending
95  * subcalls.
96  */
97 struct server_subcall
98 {
99         struct server_subcall *next;    /**< next subcall for the client */
100         uint32_t subcallid;             /**< the subcallid */
101         void (*callback)(void*, int, struct json_object*); /**< callback on completion */
102         void *closure;                  /**< closure of the callback */
103 };
104
105 /**
106  * Structure for sending back replies on client side
107  */
108 struct afb_proto_ws_subcall
109 {
110         struct afb_proto_ws *protows;   /**< proto descriptor */
111         void *buffer;
112         uint32_t subcallid;             /**< subcallid for the reply */
113 };
114
115 /*
116  * structure for recording calls on client side
117  */
118 struct client_call {
119         struct client_call *next;       /* the next call */
120         struct afb_proto_ws *protows;   /* the proto_ws */
121         void *request;
122         uint32_t callid;                /* the message identifier */
123 };
124
125 /*
126  * structure for a ws request
127  */
128 struct afb_proto_ws_call {
129         struct client_call *next;       /* the next call */
130         struct afb_proto_ws *protows;   /* the client of the request */
131         uint32_t refcount;              /* reference count */
132         uint32_t callid;                /* the incoming request callid */
133         char *buffer;                   /* the incoming buffer */
134 };
135
136 /*
137  * structure for recording describe requests
138  */
139 struct client_describe
140 {
141         struct client_describe *next;
142         struct afb_proto_ws *protows;
143         void (*callback)(void*, struct json_object*);
144         void *closure;
145         uint32_t descid;
146 };
147
148 /*
149  * structure for jobs of describing
150  */
151 struct afb_proto_ws_describe
152 {
153         struct afb_proto_ws *protows;
154         uint32_t descid;
155 };
156
157 /******************* proto description for client or servers ******************/
158
159 struct afb_proto_ws
160 {
161         /* count of references */
162         int refcount;
163
164         /* file descriptor */
165         struct fdev *fdev;
166
167         /* resource control */
168         pthread_mutex_t mutex;
169
170         /* websocket */
171         struct afb_ws *ws;
172
173         /* the client closure */
174         void *closure;
175
176         /* the client side interface */
177         const struct afb_proto_ws_client_itf *client_itf;
178
179         /* the server side interface */
180         const struct afb_proto_ws_server_itf *server_itf;
181
182         /* emitted calls (client side) */
183         struct client_call *calls;
184
185         /* pending subcalls (server side) */
186         struct server_subcall *subcalls;
187
188         /* pending description (client side) */
189         struct client_describe *describes;
190
191         /* on hangup callback */
192         void (*on_hangup)(void *closure);
193 };
194
195 /******************* streaming objects **********************************/
196
197 #define WRITEBUF_COUNT_MAX  32
198 struct writebuf
199 {
200         struct iovec iovec[WRITEBUF_COUNT_MAX];
201         uint32_t uints[WRITEBUF_COUNT_MAX];
202         int count;
203 };
204
205 struct readbuf
206 {
207         char *base, *head, *end;
208 };
209
210 struct binary
211 {
212         struct afb_proto_ws *protows;
213         struct readbuf rb;
214 };
215
216 /******************* common useful tools **********************************/
217
218 /**
219  * translate a pointer to some integer
220  * @param ptr the pointer to translate
221  * @return an integer
222  */
223 static inline uint32_t ptr2id(void *ptr)
224 {
225         return (uint32_t)(((intptr_t)ptr) >> 6);
226 }
227
228 /******************* serialisation part **********************************/
229
230 static char *readbuf_get(struct readbuf *rb, uint32_t length)
231 {
232         char *before = rb->head;
233         char *after = before + length;
234         if (after > rb->end)
235                 return 0;
236         rb->head = after;
237         return before;
238 }
239
240 static int readbuf_char(struct readbuf *rb, char *value)
241 {
242         if (rb->head >= rb->end)
243                 return 0;
244         *value = *rb->head++;
245         return 1;
246 }
247
248 static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
249 {
250         char *after = rb->head + sizeof *value;
251         if (after > rb->end)
252                 return 0;
253         memcpy(value, rb->head, sizeof *value);
254         rb->head = after;
255         *value = le32toh(*value);
256         return 1;
257 }
258
259 static int readbuf_string(struct readbuf *rb, const char **value, size_t *length)
260 {
261         uint32_t len;
262         if (!readbuf_uint32(rb, &len) || !len)
263                 return 0;
264         if (length)
265                 *length = (size_t)(len - 1);
266         return (*value = readbuf_get(rb, len)) != NULL &&  rb->head[-1] == 0;
267 }
268
269 static int readbuf_object(struct readbuf *rb, struct json_object **object)
270 {
271         const char *string;
272         struct json_object *o;
273         int rc = readbuf_string(rb, &string, NULL);
274         if (rc) {
275                 o = json_tokener_parse(string);
276                 if (o == NULL && strcmp(string, "null"))
277                         o = json_object_new_string(string);
278                 *object = o;
279         }
280         return rc;
281 }
282
283 static int writebuf_put(struct writebuf *wb, const void *value, size_t length)
284 {
285         int i = wb->count;
286         if (i == WRITEBUF_COUNT_MAX)
287                 return 0;
288         wb->iovec[i].iov_base = (void*)value;
289         wb->iovec[i].iov_len = length;
290         wb->count = i + 1;
291         return 1;
292 }
293
294 static int writebuf_char(struct writebuf *wb, char value)
295 {
296         int i = wb->count;
297         if (i == WRITEBUF_COUNT_MAX)
298                 return 0;
299         *(char*)&wb->uints[i] = value;
300         wb->iovec[i].iov_base = &wb->uints[i];
301         wb->iovec[i].iov_len = 1;
302         wb->count = i + 1;
303         return 1;
304 }
305
306 static int writebuf_uint32(struct writebuf *wb, uint32_t value)
307 {
308         int i = wb->count;
309         if (i == WRITEBUF_COUNT_MAX)
310                 return 0;
311         wb->uints[i] = htole32(value);
312         wb->iovec[i].iov_base = &wb->uints[i];
313         wb->iovec[i].iov_len = sizeof wb->uints[i];
314         wb->count = i + 1;
315         return 1;
316 }
317
318 static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
319 {
320         uint32_t len = (uint32_t)++length;
321         return (size_t)len == length && len && writebuf_uint32(wb, len) && writebuf_put(wb, value, length);
322 }
323
324 static int writebuf_string(struct writebuf *wb, const char *value)
325 {
326         return writebuf_string_length(wb, value, strlen(value));
327 }
328
329 static int writebuf_object(struct writebuf *wb, struct json_object *object)
330 {
331         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
332         return string != NULL && writebuf_string(wb, string);
333 }
334
335 /******************* ws request part for server *****************/
336
337 void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
338 {
339         __atomic_add_fetch(&call->refcount, 1, __ATOMIC_RELAXED);
340 }
341
342 void afb_proto_ws_call_unref(struct afb_proto_ws_call *call)
343 {
344         if (__atomic_sub_fetch(&call->refcount, 1, __ATOMIC_RELAXED))
345                 return;
346
347         afb_proto_ws_unref(call->protows);
348         free(call->buffer);
349         free(call);
350 }
351
352 int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object *obj, const char *info)
353 {
354         int rc = -1;
355         struct writebuf wb = { .count = 0 };
356         struct afb_proto_ws *protows = call->protows;
357
358         if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
359          && writebuf_uint32(&wb, call->callid)
360          && writebuf_string(&wb, info ?: "")
361          && writebuf_object(&wb, obj)) {
362                 pthread_mutex_lock(&protows->mutex);
363                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
364                 pthread_mutex_unlock(&protows->mutex);
365                 if (rc >= 0) {
366                         rc = 0;
367                         goto success;
368                 }
369         }
370 success:
371         return rc;
372 }
373
374 int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, const char *info)
375 {
376         int rc = -1;
377         struct writebuf wb = { .count = 0 };
378         struct afb_proto_ws *protows = call->protows;
379
380         if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
381          && writebuf_uint32(&wb, call->callid)
382          && writebuf_string(&wb, status)
383          && writebuf_string(&wb, info ? : "")) {
384                 pthread_mutex_lock(&protows->mutex);
385                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
386                 pthread_mutex_unlock(&protows->mutex);
387                 if (rc >= 0) {
388                         rc = 0;
389                         goto success;
390                 }
391         }
392 success:
393         return rc;
394 }
395
396 int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
397 {
398         int rc = -1;
399         struct writebuf wb = { .count = 0 };
400         struct server_subcall *sc, *osc;
401         struct afb_proto_ws *protows = call->protows;
402
403         sc = malloc(sizeof *sc);
404         if (!sc)
405                 errno = ENOMEM;
406         else {
407                 sc->callback = callback;
408                 sc->closure = cb_closure;
409
410                 pthread_mutex_lock(&protows->mutex);
411                 sc->subcallid = ptr2id(sc);
412                 do {
413                         sc->subcallid++;
414                         osc = protows->subcalls;
415                         while(osc && osc->subcallid != sc->subcallid)
416                                 osc = osc->next;
417                 } while (osc);
418                 sc->next = protows->subcalls;
419                 protows->subcalls = sc;
420                 pthread_mutex_unlock(&protows->mutex);
421
422                 if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL)
423                  && writebuf_uint32(&wb, sc->subcallid)
424                  && writebuf_uint32(&wb, call->callid)
425                  && writebuf_string(&wb, api)
426                  && writebuf_string(&wb, verb)
427                  && writebuf_object(&wb, args)) {
428                         pthread_mutex_lock(&protows->mutex);
429                         rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
430                         pthread_mutex_unlock(&protows->mutex);
431                         if (rc >= 0) {
432                                 rc = 0;
433                                 goto success;
434                         }
435                 }
436         }
437 success:
438         return rc;
439 }
440
441 int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
442 {
443         int rc = -1;
444         struct writebuf wb = { .count = 0 };
445         struct afb_proto_ws *protows = call->protows;
446
447         if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
448          && writebuf_uint32(&wb, call->callid)
449          && writebuf_uint32(&wb, (uint32_t)event_id)
450          && writebuf_string(&wb, event_name)) {
451                 pthread_mutex_lock(&protows->mutex);
452                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
453                 pthread_mutex_unlock(&protows->mutex);
454                 if (rc >= 0) {
455                         rc = 0;
456                         goto success;
457                 }
458         }
459 success:
460         return rc;
461 }
462
463 int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id)
464 {
465         int rc = -1;
466         struct writebuf wb = { .count = 0 };
467         struct afb_proto_ws *protows = call->protows;
468
469         if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
470          && writebuf_uint32(&wb, call->callid)
471          && writebuf_uint32(&wb, (uint32_t)event_id)
472          && writebuf_string(&wb, event_name)) {
473                 pthread_mutex_lock(&protows->mutex);
474                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
475                 pthread_mutex_unlock(&protows->mutex);
476                 if (rc >= 0) {
477                         rc = 0;
478                         goto success;
479                 }
480         }
481 success:
482         return rc;
483 }
484
485 /******************* client part **********************************/
486
487 /* search a memorized call */
488 static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
489 {
490         struct client_call *call;
491
492         call = protows->calls;
493         while (call != NULL && call->callid != callid)
494                 call = call->next;
495
496         return call;
497 }
498
499 static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
500 {
501         struct client_call *result;
502
503         pthread_mutex_lock(&protows->mutex);
504         result = client_call_search_locked(protows, callid);
505         pthread_mutex_unlock(&protows->mutex);
506         return result;
507 }
508
509 /* free and release the memorizing call */
510 static void client_call_destroy(struct client_call *call)
511 {
512         struct client_call **prv;
513         struct afb_proto_ws *protows = call->protows;
514
515         pthread_mutex_lock(&protows->mutex);
516         prv = &call->protows->calls;
517         while (*prv != NULL) {
518                 if (*prv == call) {
519                         *prv = call->next;
520                         break;
521                 }
522                 prv = &(*prv)->next;
523         }
524         pthread_mutex_unlock(&protows->mutex);
525         free(call);
526 }
527
528 /* get event data from the message */
529 static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
530 {
531         return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL);
532 }
533
534 /* get event from the message */
535 static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call)
536 {
537         uint32_t callid;
538
539         /* get event data from the message */
540         if (!readbuf_uint32(rb, &callid)) {
541                 return 0;
542         }
543
544         /* get the call */
545         *call = client_call_search_unlocked(protows, callid);
546         if (*call == NULL) {
547                 return 0;
548         }
549
550         return 1;
551 }
552
553 /* adds an event */
554 static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb)
555 {
556         const char *event_name;
557         uint32_t event_id;
558
559         if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name))
560                 protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id);
561 }
562
563 /* removes an event */
564 static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb)
565 {
566         const char *event_name;
567         uint32_t event_id;
568
569         if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name))
570                 protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id);
571 }
572
573 /* subscribes an event */
574 static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb)
575 {
576         const char *event_name;
577         uint32_t event_id;
578         struct client_call *call;
579
580         if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
581                 protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id);
582 }
583
584 /* unsubscribes an event */
585 static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb)
586 {
587         const char *event_name;
588         uint32_t event_id;
589         struct client_call *call;
590
591         if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name))
592                 protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id);
593 }
594
595 /* receives broadcasted events */
596 static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb)
597 {
598         const char *event_name;
599         struct json_object *object;
600
601         if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object))
602                 protows->client_itf->on_event_broadcast(protows->closure, event_name, object);
603 }
604
605 /* pushs an event */
606 static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb)
607 {
608         const char *event_name;
609         uint32_t event_id;
610         struct json_object *object;
611
612         if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object))
613                 protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object);
614 }
615
616 static void client_on_reply_success(struct afb_proto_ws *protows, struct readbuf *rb)
617 {
618         struct client_call *call;
619         struct json_object *object;
620         const char *info;
621
622         if (!client_msg_call_get(protows, rb, &call))
623                 return;
624
625         if (readbuf_string(rb, &info, NULL) && readbuf_object(rb, &object)) {
626                 protows->client_itf->on_reply_success(protows->closure, call->request, object, info);
627         } else {
628                 protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process success");
629         }
630         client_call_destroy(call);
631 }
632
633 static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *rb)
634 {
635         struct client_call *call;
636         const char *info, *status;
637
638         if (!client_msg_call_get(protows, rb, &call))
639                 return;
640         
641
642         if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
643                 protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
644         } else {
645                 protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process fail");
646         }
647         client_call_destroy(call);
648 }
649
650 /* send a subcall reply */
651 static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subcallid, int status, json_object *object)
652 {
653         struct writebuf wb = { .count = 0 };
654         char ie = status < 0;
655         int rc;
656
657         if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
658          && writebuf_uint32(&wb, subcallid)
659          && writebuf_char(&wb, ie)
660          && writebuf_object(&wb, object)) {
661                 pthread_mutex_lock(&protows->mutex);
662                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
663                 pthread_mutex_unlock(&protows->mutex);
664                 if (rc >= 0)
665                         return 0;
666         }
667         return -1;
668 }
669
670 /* callback for subcall reply */
671 int afb_proto_ws_subcall_reply(struct afb_proto_ws_subcall *subcall, int status, struct json_object *result)
672 {
673         int rc = client_send_subcall_reply(subcall->protows, subcall->subcallid, status, result);
674         afb_proto_ws_unref(subcall->protows);
675         free(subcall->buffer);
676         free(subcall);
677         return rc;
678 }
679
680 /* received a subcall request */
681 static void client_on_subcall(struct afb_proto_ws *protows, struct readbuf *rb)
682 {
683         struct afb_proto_ws_subcall *subcall;
684         struct client_call *call;
685         const char *api, *verb;
686         uint32_t subcallid;
687         struct json_object *object;
688
689         /* get the subcallid */
690         if (!readbuf_uint32(rb, &subcallid))
691                 return;
692
693         /* if not expected drop it */
694         if (!protows->client_itf->on_subcall)
695                 goto error;
696
697         /* retrieve the message data */
698         if (!client_msg_call_get(protows, rb, &call))
699                 goto error;
700
701         /* allocation of the subcall */
702         subcall = malloc(sizeof *subcall);
703         if (!subcall)
704                 goto error;
705
706         /* make the call */
707         if (readbuf_string(rb, &api, NULL)
708          && readbuf_string(rb, &verb, NULL)
709          && readbuf_object(rb, &object)) {
710                 afb_proto_ws_addref(protows);
711                 subcall->protows = protows;
712                 subcall->subcallid = subcallid;
713                 subcall->buffer = rb->base;
714                 rb->base = NULL;
715                 protows->client_itf->on_subcall(protows->closure, subcall, call->request, api, verb, object);
716                 return;
717         }
718         free(subcall);
719 error:
720         client_send_subcall_reply(protows, subcallid, 1, NULL);
721 }
722
723 static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb)
724 {
725         uint32_t descid;
726         struct client_describe *desc, **prv;
727         struct json_object *object;
728
729         if (readbuf_uint32(rb, &descid)) {
730                 pthread_mutex_lock(&protows->mutex);
731                 prv = &protows->describes;
732                 while ((desc = *prv) && desc->descid != descid)
733                         prv = &desc->next;
734                 if (!desc)
735                         pthread_mutex_unlock(&protows->mutex);
736                 else {
737                         *prv = desc->next;
738                         pthread_mutex_unlock(&protows->mutex);
739                         if (!readbuf_object(rb, &object))
740                                 object = NULL;
741                         desc->callback(desc->closure, object);
742                         free(desc);
743                 }
744         }
745 }
746
747 /* callback when receiving binary data */
748 static void client_on_binary_job(int sig, void *closure)
749 {
750         struct binary *binary = closure;
751
752         if (!sig) {
753                 switch (*binary->rb.head++) {
754                 case CHAR_FOR_ANSWER_SUCCESS: /* success */
755                         client_on_reply_success(binary->protows, &binary->rb);
756                         break;
757                 case CHAR_FOR_ANSWER_FAIL: /* fail */
758                         client_on_reply_fail(binary->protows, &binary->rb);
759                         break;
760                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
761                         client_on_event_broadcast(binary->protows, &binary->rb);
762                         break;
763                 case CHAR_FOR_EVT_ADD: /* creates the event */
764                         client_on_event_create(binary->protows, &binary->rb);
765                         break;
766                 case CHAR_FOR_EVT_DEL: /* removes the event */
767                         client_on_event_remove(binary->protows, &binary->rb);
768                         break;
769                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
770                         client_on_event_push(binary->protows, &binary->rb);
771                         break;
772                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
773                         client_on_event_subscribe(binary->protows, &binary->rb);
774                         break;
775                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
776                         client_on_event_unsubscribe(binary->protows, &binary->rb);
777                         break;
778                 case CHAR_FOR_SUBCALL_CALL: /* subcall */
779                         client_on_subcall(binary->protows, &binary->rb);
780                         break;
781                 case CHAR_FOR_DESCRIPTION: /* description */
782                         client_on_description(binary->protows, &binary->rb);
783                         break;
784                 default: /* unexpected message */
785                         /* TODO: close the connection */
786                         break;
787                 }
788         }
789         free(binary->rb.base);
790         free(binary);
791 }
792
793 /* callback when receiving binary data */
794 static void client_on_binary(void *closure, char *data, size_t size)
795 {
796         int rc;
797         struct binary *binary;
798
799         if (size) {
800                 binary = malloc(sizeof *binary);
801                 if (!binary) {
802                         errno = ENOMEM;
803                 } else {
804                         binary->protows = closure;
805                         binary->rb.base = data;
806                         binary->rb.head = data;
807                         binary->rb.end = data + size;
808                         rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
809                         if (rc >= 0)
810                                 return;
811                         free(binary);
812                 }
813         }
814         free(data);
815 }
816
817 int afb_proto_ws_client_call(
818                 struct afb_proto_ws *protows,
819                 const char *verb,
820                 struct json_object *args,
821                 const char *sessionid,
822                 void *request
823 )
824 {
825         int rc = -1;
826         struct client_call *call;
827         struct writebuf wb = { .count = 0 };
828
829         /* allocate call data */
830         call = malloc(sizeof *call);
831         if (call == NULL) {
832                 errno = ENOMEM;
833                 return -1;
834         }
835         call->request = request;
836
837         /* init call data */
838         pthread_mutex_lock(&protows->mutex);
839         call->callid = ptr2id(call);
840         while(client_call_search_locked(protows, call->callid) != NULL)
841                 call->callid++;
842         call->protows = protows;
843         call->next = protows->calls;
844         protows->calls = call;
845         pthread_mutex_unlock(&protows->mutex);
846
847         /* creates the call message */
848         if (!writebuf_char(&wb, CHAR_FOR_CALL)
849          || !writebuf_uint32(&wb, call->callid)
850          || !writebuf_string(&wb, verb)
851          || !writebuf_string(&wb, sessionid)
852          || !writebuf_object(&wb, args)) {
853                 errno = EINVAL;
854                 goto clean;
855         }
856
857         /* send */
858         pthread_mutex_lock(&protows->mutex);
859         rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
860         pthread_mutex_unlock(&protows->mutex);
861         if (rc >= 0) {
862                 rc = 0;
863                 goto end;
864         }
865
866 clean:
867         client_call_destroy(call);
868 end:
869         return rc;
870 }
871
872 /* get the description */
873 int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure)
874 {
875         struct client_describe *desc, *d;
876         struct writebuf wb = { .count = 0 };
877
878         desc = malloc(sizeof *desc);
879         if (!desc) {
880                 errno = ENOMEM;
881                 goto error;
882         }
883
884         /* fill in stack the description of the task */
885         pthread_mutex_lock(&protows->mutex);
886         desc->descid = ptr2id(desc);
887         d = protows->describes;
888         while (d) {
889                 if (d->descid != desc->descid)
890                         d = d->next;
891                 else {
892                         desc->descid++;
893                         d = protows->describes;
894                 }
895         }
896         desc->callback = callback;
897         desc->closure = closure;
898         desc->protows = protows;
899         desc->next = protows->describes;
900         protows->describes = desc;
901
902         /* send */
903         if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
904          && writebuf_uint32(&wb, desc->descid)
905          && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
906                 pthread_mutex_unlock(&protows->mutex);
907                 return 0;
908         }
909
910         d = protows->describes;
911         if (d == desc)
912                 protows->describes = desc->next;
913         else {
914                 while(d && d->next != desc)
915                         d = d->next;
916                 if (d)
917                         d->next = desc->next;
918         }
919         pthread_mutex_unlock(&protows->mutex);
920         free(desc);
921 error:
922         /* TODO? callback(closure, NULL); */
923         return -1;
924 }
925
926 /******************* client description part for server *****************************/
927
928 /* on call, propagate it to the ws service */
929 static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
930 {
931         struct afb_proto_ws_call *call;
932         const char *uuid, *verb;
933         uint32_t callid;
934         size_t lenverb;
935         struct json_object *object;
936
937         afb_proto_ws_addref(protows);
938
939         /* reads the call message data */
940         if (!readbuf_uint32(rb, &callid)
941          || !readbuf_string(rb, &verb, &lenverb)
942          || !readbuf_string(rb, &uuid, NULL)
943          || !readbuf_object(rb, &object))
944                 goto overflow;
945
946         /* create the request */
947         call = malloc(sizeof *call);
948         if (call == NULL)
949                 goto out_of_memory;
950
951         call->protows = protows;
952         call->callid = callid;
953         call->refcount = 1;
954         call->buffer = rb->base;
955         rb->base = NULL; /* don't free the buffer */
956
957         protows->server_itf->on_call(protows->closure, call, verb, object, uuid);
958         return;
959
960 out_of_memory:
961         json_object_put(object);
962
963 overflow:
964         afb_proto_ws_unref(protows);
965 }
966
967 /* on subcall reply */
968 static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf *rb)
969 {
970         char ie;
971         uint32_t subcallid;
972         struct json_object *object;
973         struct server_subcall *sc, **psc;
974
975         /* reads the call message data */
976         if (!readbuf_uint32(rb, &subcallid)
977          || !readbuf_char(rb, &ie)
978          || !readbuf_object(rb, &object)) {
979                 /* TODO bad protocol */
980                 return;
981         }
982
983         /* search the subcall and unlink it */
984         pthread_mutex_lock(&protows->mutex);
985         psc = &protows->subcalls;
986         while ((sc = *psc) && sc->subcallid != subcallid)
987                 psc = &sc->next;
988         if (!sc) {
989                 pthread_mutex_unlock(&protows->mutex);
990                 json_object_put(object);
991                 /* TODO subcall not found */
992         } else {
993                 *psc = sc->next;
994                 pthread_mutex_unlock(&protows->mutex);
995                 sc->callback(sc->closure, -(int)ie, object);
996                 free(sc);
997         }
998 }
999
1000 static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
1001 {
1002         int rc;
1003         struct writebuf wb = { .count = 0 };
1004
1005         if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
1006          && writebuf_uint32(&wb, descid)
1007          && writebuf_object(&wb, descobj)) {
1008                 pthread_mutex_lock(&protows->mutex);
1009                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
1010                 pthread_mutex_unlock(&protows->mutex);
1011                 if (rc >= 0)
1012                         return 0;
1013         }
1014         return -1;
1015 }
1016
1017 int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
1018 {
1019         int rc = server_send_description(describe->protows, describe->descid, description);
1020         afb_proto_ws_unref(describe->protows);
1021         free(describe);
1022         return rc;
1023 }
1024
1025 /* on describe, propagate it to the ws service */
1026 static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
1027 {
1028         uint32_t descid;
1029         struct afb_proto_ws_describe *desc;
1030
1031         /* reads the descid */
1032         if (readbuf_uint32(rb, &descid)) {
1033                 if (protows->server_itf->on_describe) {
1034                         /* create asynchronous job */
1035                         desc = malloc(sizeof *desc);
1036                         if (desc) {
1037                                 desc->descid = descid;
1038                                 desc->protows = protows;
1039                                 afb_proto_ws_addref(protows);
1040                                 protows->server_itf->on_describe(protows->closure, desc);
1041                                 return;
1042                         }
1043                 }
1044                 server_send_description(protows, descid, NULL);
1045         }
1046 }
1047
1048 /* callback when receiving binary data */
1049 static void server_on_binary_job(int sig, void *closure)
1050 {
1051         struct binary *binary = closure;
1052
1053         if (!sig) {
1054                 switch (*binary->rb.head++) {
1055                 case CHAR_FOR_CALL:
1056                         server_on_call(binary->protows, &binary->rb);
1057                         break;
1058                 case CHAR_FOR_SUBCALL_REPLY:
1059                         server_on_subcall_reply(binary->protows, &binary->rb);
1060                         break;
1061                 case CHAR_FOR_DESCRIBE:
1062                         server_on_describe(binary->protows, &binary->rb);
1063                         break;
1064                 default: /* unexpected message */
1065                         /* TODO: close the connection */
1066                         break;
1067                 }
1068         }
1069         free(binary->rb.base);
1070         free(binary);
1071 }
1072
1073 static void server_on_binary(void *closure, char *data, size_t size)
1074 {
1075         int rc;
1076         struct binary *binary;
1077
1078         if (size) {
1079                 binary = malloc(sizeof *binary);
1080                 if (!binary) {
1081                         errno = ENOMEM;
1082                 } else {
1083                         binary->protows = closure;
1084                         binary->rb.base = data;
1085                         binary->rb.head = data;
1086                         binary->rb.end = data + size;
1087                         rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
1088                         if (rc >= 0)
1089                                 return;
1090                         free(binary);
1091                 }
1092         }
1093         free(data);
1094 }
1095
1096 /******************* server part: manage events **********************************/
1097
1098 static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
1099 {
1100         struct writebuf wb = { .count = 0 };
1101         int rc;
1102
1103         if (writebuf_char(&wb, order)
1104          && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
1105          && writebuf_string(&wb, event_name)
1106          && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
1107                 pthread_mutex_lock(&protows->mutex);
1108                 rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
1109                 pthread_mutex_unlock(&protows->mutex);
1110                 if (rc >= 0)
1111                         return 0;
1112         }
1113         return -1;
1114 }
1115
1116 int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
1117 {
1118         return server_event_send(protows, CHAR_FOR_EVT_ADD, event_name, event_id, NULL);
1119 }
1120
1121 int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id)
1122 {
1123         return server_event_send(protows, CHAR_FOR_EVT_DEL, event_name, event_id, NULL);
1124 }
1125
1126 int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data)
1127 {
1128         return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data);
1129 }
1130
1131 int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data)
1132 {
1133         return server_event_send(protows, CHAR_FOR_EVT_BROADCAST, event_name, 0, data);
1134 }
1135
1136 /*****************************************************/
1137
1138 /* callback when receiving a hangup */
1139 static void on_hangup(void *closure)
1140 {
1141         struct afb_proto_ws *protows = closure;
1142         struct server_subcall *sc, *nsc;
1143         struct client_describe *cd, *ncd;
1144
1145         nsc = protows->subcalls;
1146         while (nsc) {
1147                 sc= nsc;
1148                 nsc = sc->next;
1149                 sc->callback(sc->closure, 1, NULL);
1150                 free(sc);
1151         }
1152
1153         ncd = protows->describes;
1154         while (ncd) {
1155                 cd= ncd;
1156                 ncd = cd->next;
1157                 cd->callback(cd->closure, NULL);
1158                 free(cd);
1159         }
1160
1161         if (protows->fdev) {
1162                 fdev_unref(protows->fdev);
1163                 protows->fdev = 0;
1164                 if (protows->on_hangup)
1165                         protows->on_hangup(protows->closure);
1166         }
1167 }
1168
1169 /*****************************************************/
1170
1171 static const struct afb_ws_itf proto_ws_client_ws_itf =
1172 {
1173         .on_close = NULL,
1174         .on_text = NULL,
1175         .on_binary = client_on_binary,
1176         .on_error = NULL,
1177         .on_hangup = on_hangup
1178 };
1179
1180 static const struct afb_ws_itf server_ws_itf =
1181 {
1182         .on_close = NULL,
1183         .on_text = NULL,
1184         .on_binary = server_on_binary,
1185         .on_error = NULL,
1186         .on_hangup = on_hangup
1187 };
1188
1189 /*****************************************************/
1190
1191 static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
1192 {
1193         struct afb_proto_ws *protows;
1194
1195         protows = calloc(1, sizeof *protows);
1196         if (protows == NULL)
1197                 errno = ENOMEM;
1198         else {
1199                 fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC);
1200                 fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK);
1201                 protows->ws = afb_ws_create(fdev, itf, protows);
1202                 if (protows->ws != NULL) {
1203                         protows->fdev = fdev;
1204                         protows->refcount = 1;
1205                         protows->subcalls = NULL;
1206                         protows->closure = closure;
1207                         protows->server_itf = itfs;
1208                         protows->client_itf = itfc;
1209                         pthread_mutex_init(&protows->mutex, NULL);
1210                         return protows;
1211                 }
1212                 free(protows);
1213         }
1214         return NULL;
1215 }
1216
1217 struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
1218 {
1219         return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
1220 }
1221
1222 struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)
1223 {
1224         return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf);
1225 }
1226
1227 void afb_proto_ws_unref(struct afb_proto_ws *protows)
1228 {
1229         if (!__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) {
1230                 afb_proto_ws_hangup(protows);
1231                 afb_ws_destroy(protows->ws);
1232                 pthread_mutex_destroy(&protows->mutex);
1233                 free(protows);
1234         }
1235 }
1236
1237 void afb_proto_ws_addref(struct afb_proto_ws *protows)
1238 {
1239         __atomic_add_fetch(&protows->refcount, 1, __ATOMIC_RELAXED);
1240 }
1241
1242 int afb_proto_ws_is_client(struct afb_proto_ws *protows)
1243 {
1244         return !!protows->client_itf;
1245 }
1246
1247 int afb_proto_ws_is_server(struct afb_proto_ws *protows)
1248 {
1249         return !!protows->server_itf;
1250 }
1251
1252 void afb_proto_ws_hangup(struct afb_proto_ws *protows)
1253 {
1254         afb_ws_hangup(protows->ws);
1255 }
1256
1257 void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure))
1258 {
1259         protows->on_hangup = on_hangup;
1260 }
1261