afb-proto-ws: Improve comment and names
[src/app-framework-binder.git] / src / afb-proto-ws.c
1 /*
2  * Copyright (C) 2015-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <errno.h>
26 #include <endian.h>
27 #include <netdb.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/un.h>
31 #include <pthread.h>
32
33 #include <json-c/json.h>
34
35 #include "afb-ws.h"
36 #include "afb-msg-json.h"
37 #include "afb-proto-ws.h"
38 #include "fdev.h"
39 #include "verbose.h"
40
41 struct afb_proto_ws;
42
43 /******** implementation of internal binder protocol per api **************/
44 /*
45
46 This protocol is asymmetric: there is a client and a server
47
48 The client can require the following actions:
49
50   - call a verb
51
52   - ask for description
53
54 The server must reply to the previous actions by
55
56   - answering success or failure of the call
57
58   - answering the required description
59
60 The server can also within the context of a call
61
62   - subscribe or unsubscribe an event
63
64 For the purpose of handling events the server can:
65
66   - create/destroy an event
67
68   - push or broadcast data as an event
69
70 */
71 /************** constants for protocol definition *************************/
72
73 #define CHAR_FOR_CALL             'K'   /* client -> server */
74 #define CHAR_FOR_REPLY            'k'   /* server -> client */
75 #define CHAR_FOR_EVT_BROADCAST    'B'   /* server -> client */
76 #define CHAR_FOR_EVT_ADD          'E'   /* server -> client */
77 #define CHAR_FOR_EVT_DEL          'e'   /* server -> client */
78 #define CHAR_FOR_EVT_PUSH         'P'   /* server -> client */
79 #define CHAR_FOR_EVT_SUBSCRIBE    'X'   /* server -> client */
80 #define CHAR_FOR_EVT_UNSUBSCRIBE  'x'   /* server -> client */
81 #define CHAR_FOR_DESCRIBE         'D'   /* client -> server */
82 #define CHAR_FOR_DESCRIPTION      'd'   /* server -> client */
83 #define CHAR_FOR_TOKEN_ADD        'T'   /* client -> server */
84 #define CHAR_FOR_TOKEN_DROP       't'   /* client -> server */
85 #define CHAR_FOR_SESSION_ADD      'S'   /* client -> server */
86 #define CHAR_FOR_SESSION_DROP     's'   /* client -> server */
87 #define CHAR_FOR_VERSION_OFFER    'V'   /* client -> server */
88 #define CHAR_FOR_VERSION_SET      'v'   /* server -> client */
89
90 /******************* manage versions *****************************/
91
92 #define WSAPI_IDENTIFIER        02723012011  /* wsapi: 23.19.1.16.9 */
93
94 #define WSAPI_VERSION_UNSET     0
95 #define WSAPI_VERSION_1         1
96
97 #define WSAPI_VERSION_MIN       WSAPI_VERSION_1
98 #define WSAPI_VERSION_MAX       WSAPI_VERSION_1
99
100 /******************* maximum count of ids ***********************/
101
102 #define ACTIVE_ID_MAX           4095
103
104 /******************* handling calls *****************************/
105
106 /*
107  * structure for recording calls on client side
108  */
109 struct client_call {
110         struct client_call *next;       /* the next call */
111         void *request;                  /* the request closure */
112         uint16_t callid;                /* the message identifier */
113 };
114
115 /*
116  * structure for a ws request
117  */
118 struct afb_proto_ws_call {
119         struct afb_proto_ws *protows;   /* the client of the request */
120         char *buffer;                   /* the incoming buffer */
121         uint16_t refcount;              /* reference count */
122         uint16_t callid;                /* the incoming request callid */
123 };
124
125 /*
126  * structure for recording describe requests
127  */
128 struct client_describe
129 {
130         struct client_describe *next;
131         void (*callback)(void*, struct json_object*);
132         void *closure;
133         uint16_t descid;
134 };
135
136 /*
137  * structure for jobs of describing
138  */
139 struct afb_proto_ws_describe
140 {
141         struct afb_proto_ws *protows;
142         uint16_t descid;
143 };
144
145 /******************* proto description for client or servers ******************/
146
147 struct afb_proto_ws
148 {
149         /* count of references */
150         uint16_t refcount;
151
152         /* id generator */
153         uint16_t genid;
154
155         /* count actives ids */
156         uint16_t idcount;
157
158         /* version */
159         uint8_t version;
160
161         /* resource control */
162         pthread_mutex_t mutex;
163
164         /* websocket */
165         struct afb_ws *ws;
166
167         /* the client closure */
168         void *closure;
169
170         /* the client side interface */
171         const struct afb_proto_ws_client_itf *client_itf;
172
173         /* the server side interface */
174         const struct afb_proto_ws_server_itf *server_itf;
175
176         /* emitted calls (client side) */
177         struct client_call *calls;
178
179         /* pending description (client side) */
180         struct client_describe *describes;
181
182         /* on hangup callback */
183         void (*on_hangup)(void *closure);
184
185         /* queuing facility for processing messages */
186         int (*queuing)(struct afb_proto_ws *proto, void (*process)(int s, void *c), void *closure);
187 };
188
189 /******************* streaming objects **********************************/
190
191 #define WRITEBUF_COUNT_MAX      32
192 #define WRITEBUF_BUFSZ          (WRITEBUF_COUNT_MAX * sizeof(uint32_t))
193
194 struct writebuf
195 {
196         int iovcount, bufcount;
197         struct iovec iovec[WRITEBUF_COUNT_MAX];
198         char buf[WRITEBUF_BUFSZ];
199 };
200
201 struct readbuf
202 {
203         char *base, *head, *end;
204 };
205
206 struct binary
207 {
208         struct afb_proto_ws *protows;
209         struct readbuf rb;
210 };
211
212 /******************* serialization part **********************************/
213
214 static char *readbuf_get(struct readbuf *rb, uint32_t length)
215 {
216         char *before = rb->head;
217         char *after = before + length;
218         if (after > rb->end)
219                 return 0;
220         rb->head = after;
221         return before;
222 }
223
224 static int readbuf_getat(struct readbuf *rb, void *to, uint32_t length)
225 {
226         char *head = readbuf_get(rb, length);
227         if (!head)
228                 return 0;
229         memcpy(to, head, length);
230         return 1;
231 }
232
233 __attribute__((unused))
234 static int readbuf_char(struct readbuf *rb, char *value)
235 {
236         return readbuf_getat(rb, value, sizeof *value);
237 }
238
239 static int readbuf_uint32(struct readbuf *rb, uint32_t *value)
240 {
241         int r = readbuf_getat(rb, value, sizeof *value);
242         if (r)
243                 *value = le32toh(*value);
244         return r;
245 }
246
247 static int readbuf_uint16(struct readbuf *rb, uint16_t *value)
248 {
249         int r = readbuf_getat(rb, value, sizeof *value);
250         if (r)
251                 *value = le16toh(*value);
252         return r;
253 }
254
255 static int readbuf_uint8(struct readbuf *rb, uint8_t *value)
256 {
257         return readbuf_getat(rb, value, sizeof *value);
258 }
259
260 static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok)
261 {
262         uint32_t len;
263         if (!readbuf_uint32(rb, &len))
264                 return 0;
265         if (!len) {
266                 if (!nulok)
267                         return 0;
268                 *value = NULL;
269                 if (length)
270                         *length = 0;
271                 return 1;
272         }
273         if (length)
274                 *length = (size_t)(len - 1);
275         return (*value = readbuf_get(rb, len)) != NULL &&  rb->head[-1] == 0;
276 }
277
278
279 static int readbuf_string(struct readbuf *rb, const char **value, size_t *length)
280 {
281         return _readbuf_string_(rb, value, length, 0);
282 }
283
284 static int readbuf_nullstring(struct readbuf *rb, const char **value, size_t *length)
285 {
286         return _readbuf_string_(rb, value, length, 1);
287 }
288
289 static int readbuf_object(struct readbuf *rb, struct json_object **object)
290 {
291         const char *string;
292         struct json_object *o;
293         enum json_tokener_error jerr;
294         int rc = readbuf_string(rb, &string, NULL);
295         if (rc) {
296                 o = json_tokener_parse_verbose(string, &jerr);
297                 if (jerr != json_tokener_success)
298                         o = json_object_new_string(string);
299                 *object = o;
300         }
301         return rc;
302 }
303
304 static int writebuf_put(struct writebuf *wb, const void *value, size_t length)
305 {
306         int i = wb->iovcount;
307         if (i == WRITEBUF_COUNT_MAX)
308                 return 0;
309         wb->iovec[i].iov_base = (void*)value;
310         wb->iovec[i].iov_len = length;
311         wb->iovcount = i + 1;
312         return 1;
313 }
314
315 static int writebuf_putbuf(struct writebuf *wb, const void *value, int length)
316 {
317         char *p;
318         int i = wb->iovcount, n = wb->bufcount, nafter;
319
320         /* check enough length */
321         nafter = n + length;
322         if (nafter > WRITEBUF_BUFSZ)
323                 return 0;
324
325         /* get where to store */
326         p = &wb->buf[n];
327         if (i && p == (((char*)wb->iovec[i - 1].iov_base) + wb->iovec[i - 1].iov_len))
328                 /* increase previous iovec */
329                 wb->iovec[i - 1].iov_len += (size_t)length;
330         else if (i == WRITEBUF_COUNT_MAX)
331                 /* no more iovec */
332                 return 0;
333         else {
334                 /* new iovec */
335                 wb->iovec[i].iov_base = p;
336                 wb->iovec[i].iov_len = (size_t)length;
337                 wb->iovcount = i + 1;
338         }
339         /* store now */
340         memcpy(p, value, (size_t)length);
341         wb->bufcount = nafter;
342         return 1;
343 }
344
345 __attribute__((unused))
346 static int writebuf_char(struct writebuf *wb, char value)
347 {
348         return writebuf_putbuf(wb, &value, 1);
349 }
350
351 static int writebuf_uint32(struct writebuf *wb, uint32_t value)
352 {
353         value = htole32(value);
354         return writebuf_putbuf(wb, &value, (int)sizeof value);
355 }
356
357 static int writebuf_uint16(struct writebuf *wb, uint16_t value)
358 {
359         value = htole16(value);
360         return writebuf_putbuf(wb, &value, (int)sizeof value);
361 }
362
363 static int writebuf_uint8(struct writebuf *wb, uint8_t value)
364 {
365         return writebuf_putbuf(wb, &value, (int)sizeof value);
366 }
367
368 static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length)
369 {
370         uint32_t len = (uint32_t)++length;
371         return (size_t)len == length && len && writebuf_uint32(wb, len) && writebuf_put(wb, value, length);
372 }
373
374 static int writebuf_string(struct writebuf *wb, const char *value)
375 {
376         return writebuf_string_length(wb, value, strlen(value));
377 }
378
379 static int writebuf_nullstring(struct writebuf *wb, const char *value)
380 {
381         return value ? writebuf_string_length(wb, value, strlen(value)) : writebuf_uint32(wb, 0);
382 }
383
384 static int writebuf_object(struct writebuf *wb, struct json_object *object)
385 {
386         const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
387         return string != NULL && writebuf_string(wb, string);
388 }
389
390 /******************* queuing of messages *****************/
391
392 /* queue the processing of the received message (except if size=0 cause it's not a valid message) */
393 static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*))
394 {
395         struct binary *binary;
396
397         if (size) {
398                 binary = malloc(sizeof *binary);
399                 if (!binary) {
400                         /* TODO process the problem */
401                         errno = ENOMEM;
402                 } else {
403                         binary->protows = protows;
404                         binary->rb.base = data;
405                         binary->rb.head = data;
406                         binary->rb.end = data + size;
407                         if (!protows->queuing
408                          || protows->queuing(protows, processing, binary) < 0)
409                                 processing(0, binary);
410                         return;
411                 }
412         }
413         free(data);
414 }
415
416 /******************* sending messages *****************/
417
418 static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb)
419 {
420         int rc;
421         struct afb_ws *ws;
422
423         pthread_mutex_lock(&protows->mutex);
424         ws = protows->ws;
425         if (ws == NULL) {
426                 errno = EPIPE;
427                 rc = -1;
428         } else {
429                 rc = afb_ws_binary_v(ws, wb->iovec, wb->iovcount);
430                 if (rc > 0)
431                         rc = 0;
432         }
433         pthread_mutex_unlock(&protows->mutex);
434         return rc;
435 }
436
437 static int send_version_offer_1(struct afb_proto_ws *protows, uint8_t version)
438 {
439         int rc = -1;
440         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
441
442         if (writebuf_char(&wb, CHAR_FOR_VERSION_OFFER)
443          && writebuf_uint32(&wb, WSAPI_IDENTIFIER)
444          && writebuf_uint8(&wb, 1) /* offer one version */
445          && writebuf_uint8(&wb, version))
446                 rc = proto_write(protows, &wb);
447         return rc;
448 }
449
450 static int send_version_set(struct afb_proto_ws *protows, uint8_t version)
451 {
452         int rc = -1;
453         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
454
455         if (writebuf_char(&wb, CHAR_FOR_VERSION_SET)
456          && writebuf_uint8(&wb, version))
457                 rc = proto_write(protows, &wb);
458         return rc;
459 }
460
461 /******************* ws request part for server *****************/
462
463 void afb_proto_ws_call_addref(struct afb_proto_ws_call *call)
464 {
465         __atomic_add_fetch(&call->refcount, 1, __ATOMIC_RELAXED);
466 }
467
468 void afb_proto_ws_call_unref(struct afb_proto_ws_call *call)
469 {
470         if (__atomic_sub_fetch(&call->refcount, 1, __ATOMIC_RELAXED))
471                 return;
472
473         afb_proto_ws_unref(call->protows);
474         free(call->buffer);
475         free(call);
476 }
477
478 int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info)
479 {
480         int rc = -1;
481         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
482         struct afb_proto_ws *protows = call->protows;
483
484         if (writebuf_char(&wb, CHAR_FOR_REPLY)
485          && writebuf_uint16(&wb, call->callid)
486          && writebuf_nullstring(&wb, error)
487          && writebuf_nullstring(&wb, info)
488          && writebuf_object(&wb, obj))
489                 rc = proto_write(protows, &wb);
490         return rc;
491 }
492
493 int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id)
494 {
495         int rc = -1;
496         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
497         struct afb_proto_ws *protows = call->protows;
498
499         if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
500          && writebuf_uint16(&wb, call->callid)
501          && writebuf_uint16(&wb, event_id))
502                 rc = proto_write(protows, &wb);
503         return rc;
504 }
505
506 int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id)
507 {
508         int rc = -1;
509         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
510         struct afb_proto_ws *protows = call->protows;
511
512         if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
513          && writebuf_uint16(&wb, call->callid)
514          && writebuf_uint16(&wb, event_id))
515                 rc = proto_write(protows, &wb);
516         return rc;
517 }
518
519 /******************* client part **********************************/
520
521 /* search a memorized call */
522 static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint16_t callid)
523 {
524         struct client_call *call;
525
526         call = protows->calls;
527         while (call != NULL && call->callid != callid)
528                 call = call->next;
529
530         return call;
531 }
532
533 static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint16_t callid)
534 {
535         struct client_call *result;
536
537         pthread_mutex_lock(&protows->mutex);
538         result = client_call_search_locked(protows, callid);
539         pthread_mutex_unlock(&protows->mutex);
540         return result;
541 }
542
543 /* free and release the memorizing call */
544 static void client_call_destroy(struct afb_proto_ws *protows, struct client_call *call)
545 {
546         struct client_call **prv;
547
548         pthread_mutex_lock(&protows->mutex);
549         prv = &protows->calls;
550         while (*prv != NULL) {
551                 if (*prv == call) {
552                         protows->idcount--;
553                         *prv = call->next;
554                         pthread_mutex_unlock(&protows->mutex);
555                         free(call);
556                         return;
557                 }
558                 prv = &(*prv)->next;
559         }
560         pthread_mutex_unlock(&protows->mutex);
561 }
562
563 /* get event from the message */
564 static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call)
565 {
566         uint16_t callid;
567
568         /* get event data from the message */
569         if (!readbuf_uint16(rb, &callid))
570                 return 0;
571
572         /* get the call */
573         *call = client_call_search_unlocked(protows, callid);
574         return *call != NULL;
575 }
576
577 /* adds an event */
578 static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb)
579 {
580         const char *event_name;
581         uint16_t event_id;
582
583         if (protows->client_itf->on_event_create 
584                         && readbuf_uint16(rb, &event_id)
585                         && readbuf_string(rb, &event_name, NULL))
586                 protows->client_itf->on_event_create(protows->closure, event_id, event_name);
587         else
588                 ERROR("Ignoring creation of event");
589 }
590
591 /* removes an event */
592 static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb)
593 {
594         uint16_t event_id;
595
596         if (protows->client_itf->on_event_remove && readbuf_uint16(rb, &event_id))
597                 protows->client_itf->on_event_remove(protows->closure, event_id);
598         else
599                 ERROR("Ignoring deletion of event");
600 }
601
602 /* subscribes an event */
603 static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb)
604 {
605         uint16_t event_id;
606         struct client_call *call;
607
608         if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
609                 protows->client_itf->on_event_subscribe(protows->closure, call->request, event_id);
610         else
611                 ERROR("Ignoring subscription to event");
612 }
613
614 /* unsubscribes an event */
615 static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb)
616 {
617         uint16_t event_id;
618         struct client_call *call;
619
620         if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id))
621                 protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_id);
622         else
623                 ERROR("Ignoring unsubscription to event");
624 }
625
626 /* receives broadcasted events */
627 static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb)
628 {
629         const char *event_name, *uuid;
630         uint8_t hop;
631         struct json_object *object;
632
633         if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_uint8(rb, &hop))
634                 protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, hop);
635         else
636                 ERROR("Ignoring broadcast of event");
637 }
638
639 /* pushs an event */
640 static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb)
641 {
642         uint16_t event_id;
643         struct json_object *object;
644
645         if (protows->client_itf->on_event_push && readbuf_uint16(rb, &event_id) && readbuf_object(rb, &object))
646                 protows->client_itf->on_event_push(protows->closure, event_id, object);
647         else
648                 ERROR("Ignoring push of event");
649 }
650
651 static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb)
652 {
653         struct client_call *call;
654         struct json_object *object;
655         const char *error, *info;
656
657         if (!client_msg_call_get(protows, rb, &call))
658                 return;
659
660         if (readbuf_nullstring(rb, &error, NULL) && readbuf_nullstring(rb, &info, NULL) && readbuf_object(rb, &object)) {
661                 protows->client_itf->on_reply(protows->closure, call->request, object, error, info);
662         } else {
663                 protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success");
664         }
665         client_call_destroy(protows, call);
666 }
667
668 static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb)
669 {
670         uint32_t descid;
671         struct client_describe *desc, **prv;
672         struct json_object *object;
673
674         if (readbuf_uint32(rb, &descid)) {
675                 pthread_mutex_lock(&protows->mutex);
676                 prv = &protows->describes;
677                 while ((desc = *prv) && desc->descid != descid)
678                         prv = &desc->next;
679                 if (!desc)
680                         pthread_mutex_unlock(&protows->mutex);
681                 else {
682                         *prv = desc->next;
683                         protows->idcount--;
684                         pthread_mutex_unlock(&protows->mutex);
685                         if (!readbuf_object(rb, &object))
686                                 object = NULL;
687                         desc->callback(desc->closure, object);
688                         free(desc);
689                 }
690         }
691 }
692
693 /* received a version set */
694 static void client_on_version_set(struct afb_proto_ws *protows, struct readbuf *rb)
695 {
696         uint8_t version;
697
698         /* reads the descid */
699         if (readbuf_uint8(rb, &version)
700          && WSAPI_VERSION_MIN <= version
701          && version <= WSAPI_VERSION_MAX) {
702                 protows->version = version;
703                 return;
704         }
705         afb_proto_ws_hangup(protows);
706 }
707
708
709 /* callback when receiving binary data */
710 static void client_on_binary_job(int sig, void *closure)
711 {
712         struct binary *binary = closure;
713
714         if (!sig) {
715                 switch (*binary->rb.head++) {
716                 case CHAR_FOR_REPLY: /* reply */
717                         client_on_reply(binary->protows, &binary->rb);
718                         break;
719                 case CHAR_FOR_EVT_BROADCAST: /* broadcast */
720                         client_on_event_broadcast(binary->protows, &binary->rb);
721                         break;
722                 case CHAR_FOR_EVT_ADD: /* creates the event */
723                         client_on_event_create(binary->protows, &binary->rb);
724                         break;
725                 case CHAR_FOR_EVT_DEL: /* removes the event */
726                         client_on_event_remove(binary->protows, &binary->rb);
727                         break;
728                 case CHAR_FOR_EVT_PUSH: /* pushs the event */
729                         client_on_event_push(binary->protows, &binary->rb);
730                         break;
731                 case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
732                         client_on_event_subscribe(binary->protows, &binary->rb);
733                         break;
734                 case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
735                         client_on_event_unsubscribe(binary->protows, &binary->rb);
736                         break;
737                 case CHAR_FOR_DESCRIPTION: /* description */
738                         client_on_description(binary->protows, &binary->rb);
739                         break;
740                 case CHAR_FOR_VERSION_SET: /* set the protocol version */
741                         client_on_version_set(binary->protows, &binary->rb);
742                         break;
743                 default: /* unexpected message */
744                         /* TODO: close the connection */
745                         break;
746                 }
747         }
748         free(binary->rb.base);
749         free(binary);
750 }
751
752 /* callback when receiving binary data */
753 static void client_on_binary(void *closure, char *data, size_t size)
754 {
755         struct afb_proto_ws *protows = closure;
756
757         queue_message_processing(protows, data, size, client_on_binary_job);
758 }
759
760 static int client_send_cmd_id16_optstr(struct afb_proto_ws *protows, char order, uint16_t id, const char *value)
761 {
762         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
763         int rc = -1;
764
765         if (writebuf_char(&wb, order)
766          && writebuf_uint16(&wb, id)
767          && (!value || writebuf_string(&wb, value)))
768                 rc = proto_write(protows, &wb);
769         return rc;
770 }
771
772 int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr)
773 {
774         return client_send_cmd_id16_optstr(protows, CHAR_FOR_SESSION_ADD, sessionid, sessionstr);
775 }
776
777 int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid)
778 {
779         return client_send_cmd_id16_optstr(protows, CHAR_FOR_SESSION_DROP, sessionid, NULL);
780 }
781
782 int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr)
783 {
784         return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_ADD, tokenid, tokenstr);
785
786 }
787
788 int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid)
789 {
790         return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL);
791 }
792
793 int afb_proto_ws_client_call(
794                 struct afb_proto_ws *protows,
795                 const char *verb,
796                 struct json_object *args,
797                 uint16_t sessionid,
798                 uint16_t tokenid,
799                 void *request,
800                 const char *user_creds
801 )
802 {
803         int rc = -1;
804         struct client_call *call;
805         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
806         uint16_t id;
807
808         /* allocate call data */
809         call = malloc(sizeof *call);
810         if (call == NULL) {
811                 errno = ENOMEM;
812                 return -1;
813         }
814         call->request = request;
815
816         /* init call data */
817         pthread_mutex_lock(&protows->mutex);
818         if (protows->idcount >= ACTIVE_ID_MAX) {
819                 pthread_mutex_unlock(&protows->mutex);
820                 errno = EBUSY;
821                 goto clean;
822         }
823         protows->idcount++;
824         id = ++protows->genid;
825         while(!id || client_call_search_locked(protows, id) != NULL)
826                 id++;
827         call->callid = protows->genid = id;
828         call->next = protows->calls;
829         protows->calls = call;
830         pthread_mutex_unlock(&protows->mutex);
831
832         /* creates the call message */
833         if (!writebuf_char(&wb, CHAR_FOR_CALL)
834          || !writebuf_uint16(&wb, call->callid)
835          || !writebuf_string(&wb, verb)
836          || !writebuf_uint16(&wb, sessionid)
837          || !writebuf_uint16(&wb, tokenid)
838          || !writebuf_object(&wb, args)
839          || !writebuf_nullstring(&wb, user_creds)) {
840                 errno = EINVAL;
841                 goto clean;
842         }
843
844         /* send */
845         rc = proto_write(protows, &wb);
846         if (!rc)
847                 goto end;
848
849 clean:
850         client_call_destroy(protows, call);
851 end:
852         return rc;
853 }
854
855 /* get the description */
856 int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure)
857 {
858         struct client_describe *desc, *d;
859         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
860         uint16_t id;
861
862         desc = malloc(sizeof *desc);
863         if (!desc) {
864                 errno = ENOMEM;
865                 goto error;
866         }
867
868         /* fill in stack the description of the task */
869         pthread_mutex_lock(&protows->mutex);
870         if (protows->idcount >= ACTIVE_ID_MAX) {
871                 errno = EBUSY;
872                 goto busy;
873         }
874         protows->idcount++;
875         id = ++protows->genid;
876         d = protows->describes;
877         while (d) {
878                 if (id && d->descid != id)
879                         d = d->next;
880                 else {
881                         id++;
882                         d = protows->describes;
883                 }
884         }
885         desc->descid = protows->genid = id;
886         desc->callback = callback;
887         desc->closure = closure;
888         desc->next = protows->describes;
889         protows->describes = desc;
890         pthread_mutex_unlock(&protows->mutex);
891
892         /* send */
893         if (!writebuf_char(&wb, CHAR_FOR_DESCRIBE)
894          || !writebuf_uint16(&wb, desc->descid)) {
895                  errno = EINVAL;
896                  goto error2;
897         }
898
899         if (proto_write(protows, &wb) == 0)
900                 return 0;
901
902 error2:
903         pthread_mutex_lock(&protows->mutex);
904         d = protows->describes;
905         if (d == desc)
906                 protows->describes = desc->next;
907         else {
908                 while(d && d->next != desc)
909                         d = d->next;
910                 if (d)
911                         d->next = desc->next;
912         }
913         protows->idcount--;
914 busy:
915         pthread_mutex_unlock(&protows->mutex);
916         free(desc);
917 error:
918         /* TODO? callback(closure, NULL); */
919         return -1;
920 }
921
922 /******************* client description part for server *****************************/
923
924 /* on call, propagate it to the ws service */
925 static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb)
926 {
927         struct afb_proto_ws_call *call;
928         const char *verb, *user_creds;
929         uint16_t callid, sessionid, tokenid;
930         size_t lenverb;
931         struct json_object *object;
932
933         afb_proto_ws_addref(protows);
934
935         /* reads the call message data */
936         if (!readbuf_uint16(rb, &callid)
937          || !readbuf_string(rb, &verb, &lenverb)
938          || !readbuf_uint16(rb, &sessionid)
939          || !readbuf_uint16(rb, &tokenid)
940          || !readbuf_object(rb, &object)
941          || !readbuf_nullstring(rb, &user_creds, NULL))
942                 goto overflow;
943
944         /* create the request */
945         call = malloc(sizeof *call);
946         if (call == NULL)
947                 goto out_of_memory;
948
949         call->protows = protows;
950         call->callid = callid;
951         call->refcount = 1;
952         call->buffer = rb->base;
953         rb->base = NULL; /* don't free the buffer */
954
955         protows->server_itf->on_call(protows->closure, call, verb, object, sessionid, tokenid, user_creds);
956         return;
957
958 out_of_memory:
959         json_object_put(object);
960
961 overflow:
962         afb_proto_ws_unref(protows);
963 }
964
965 static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
966 {
967         int rc = -1;
968         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
969
970         if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
971          && writebuf_uint32(&wb, descid)
972          && writebuf_object(&wb, descobj))
973                 rc = proto_write(protows, &wb);
974         return rc;
975 }
976
977 int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
978 {
979         int rc = server_send_description(describe->protows, describe->descid, description);
980         afb_proto_ws_unref(describe->protows);
981         free(describe);
982         return rc;
983 }
984
985 /* on describe, propagate it to the ws service */
986 static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
987 {
988         uint16_t descid;
989         struct afb_proto_ws_describe *desc;
990
991         /* reads the descid */
992         if (readbuf_uint16(rb, &descid)) {
993                 if (protows->server_itf->on_describe) {
994                         /* create asynchronous job */
995                         desc = malloc(sizeof *desc);
996                         if (desc) {
997                                 desc->descid = descid;
998                                 desc->protows = protows;
999                                 afb_proto_ws_addref(protows);
1000                                 protows->server_itf->on_describe(protows->closure, desc);
1001                                 return;
1002                         }
1003                 }
1004                 server_send_description(protows, descid, NULL);
1005         }
1006 }
1007
1008 static void server_on_session_add(struct afb_proto_ws *protows, struct readbuf *rb)
1009 {
1010         uint16_t sessionid;
1011         const char *sessionstr;
1012
1013         if (readbuf_uint16(rb, &sessionid) && readbuf_string(rb, &sessionstr, NULL))
1014                 protows->server_itf->on_session_create(protows->closure, sessionid, sessionstr);
1015 }
1016
1017 static void server_on_session_drop(struct afb_proto_ws *protows, struct readbuf *rb)
1018 {
1019         uint16_t sessionid;
1020
1021         if (readbuf_uint16(rb, &sessionid))
1022                 protows->server_itf->on_session_remove(protows->closure, sessionid);
1023 }
1024
1025 static void server_on_token_add(struct afb_proto_ws *protows, struct readbuf *rb)
1026 {
1027         uint16_t tokenid;
1028         const char *tokenstr;
1029
1030         if (readbuf_uint16(rb, &tokenid) && readbuf_string(rb, &tokenstr, NULL))
1031                 protows->server_itf->on_token_create(protows->closure, tokenid, tokenstr);
1032 }
1033
1034 static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *rb)
1035 {
1036         uint16_t tokenid;
1037
1038         if (readbuf_uint16(rb, &tokenid))
1039                 protows->server_itf->on_token_remove(protows->closure, tokenid);
1040 }
1041
1042 /* on version offer */
1043 static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb)
1044 {
1045         uint8_t count;
1046         uint8_t *versions;
1047         uint8_t version;
1048         uint8_t v;
1049         uint32_t id;
1050
1051         /* reads the descid */
1052         if (readbuf_uint32(rb, &id)
1053                 && id == WSAPI_IDENTIFIER
1054                 && readbuf_uint8(rb, &count)
1055                 && count > 0
1056                 && (versions = (uint8_t*)readbuf_get(rb, (uint32_t)count))) {
1057                 version = WSAPI_VERSION_UNSET;
1058                 while (count) {
1059                         v = versions[--count];
1060                         if (v >= WSAPI_VERSION_MIN
1061                          && v <= WSAPI_VERSION_MAX
1062                          && (version == WSAPI_VERSION_UNSET || version < v))
1063                                 version = v;
1064                 }
1065                 if (version != WSAPI_VERSION_UNSET) {
1066                         if (send_version_set(protows, version) >= 0) {
1067                                 protows->version = version;
1068                                 return;
1069                         }
1070                 }
1071         }
1072         afb_proto_ws_hangup(protows);
1073 }
1074
1075 /* callback when receiving binary data */
1076 static void server_on_binary_job(int sig, void *closure)
1077 {
1078         struct binary *binary = closure;
1079
1080         if (!sig) {
1081                 switch (*binary->rb.head++) {
1082                 case CHAR_FOR_CALL:
1083                         server_on_call(binary->protows, &binary->rb);
1084                         break;
1085                 case CHAR_FOR_DESCRIBE:
1086                         server_on_describe(binary->protows, &binary->rb);
1087                         break;
1088                 case CHAR_FOR_SESSION_ADD:
1089                         server_on_session_add(binary->protows, &binary->rb);
1090                         break;
1091                 case CHAR_FOR_SESSION_DROP:
1092                         server_on_session_drop(binary->protows, &binary->rb);
1093                         break;
1094                 case CHAR_FOR_TOKEN_ADD:
1095                         server_on_token_add(binary->protows, &binary->rb);
1096                         break;
1097                 case CHAR_FOR_TOKEN_DROP:
1098                         server_on_token_drop(binary->protows, &binary->rb);
1099                         break;
1100                 case CHAR_FOR_VERSION_OFFER:
1101                         server_on_version_offer(binary->protows, &binary->rb);
1102                         break;
1103                 default: /* unexpected message */
1104                         /* TODO: close the connection */
1105                         break;
1106                 }
1107         }
1108         free(binary->rb.base);
1109         free(binary);
1110 }
1111
1112 static void server_on_binary(void *closure, char *data, size_t size)
1113 {
1114         struct afb_proto_ws *protows = closure;
1115
1116         queue_message_processing(protows, data, size, server_on_binary_job);
1117 }
1118
1119 /******************* server part: manage events **********************************/
1120
1121 static int server_event_send(struct afb_proto_ws *protows, char order, uint16_t event_id, const char *event_name, struct json_object *data)
1122 {
1123         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
1124         int rc = -1;
1125
1126         if (writebuf_char(&wb, order)
1127          && writebuf_uint16(&wb, event_id)
1128          && (order != CHAR_FOR_EVT_ADD || writebuf_string(&wb, event_name))
1129          && (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data)))
1130                 rc = proto_write(protows, &wb);
1131         return rc;
1132 }
1133
1134 int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name)
1135 {
1136         return server_event_send(protows, CHAR_FOR_EVT_ADD, event_id, event_name, NULL);
1137 }
1138
1139 int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id)
1140 {
1141         return server_event_send(protows, CHAR_FOR_EVT_DEL, event_id, NULL, NULL);
1142 }
1143
1144 int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data)
1145 {
1146         return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_id, NULL, data);
1147 }
1148
1149 int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop)
1150 {
1151         struct writebuf wb = { .iovcount = 0, .bufcount = 0 };
1152         int rc = -1;
1153
1154         if (!hop)
1155                 return 0;
1156
1157         if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST)
1158          && writebuf_string(&wb, event_name)
1159          && writebuf_object(&wb, data)
1160          && writebuf_put(&wb, uuid, 16)
1161          && writebuf_uint8(&wb, (uint8_t)(hop - 1)))
1162                 rc = proto_write(protows, &wb);
1163         return rc;
1164 }
1165
1166 /*****************************************************/
1167
1168 /* callback when receiving a hangup */
1169 static void on_hangup(void *closure)
1170 {
1171         struct afb_proto_ws *protows = closure;
1172         struct client_describe *cd, *ncd;
1173         struct client_call *call, *ncall;
1174         struct afb_ws *ws;
1175
1176         pthread_mutex_lock(&protows->mutex);
1177         ncd = protows->describes;
1178         protows->describes = NULL;
1179         ncall = protows->calls;
1180         protows->calls = NULL;
1181         ws = protows->ws;
1182         protows->ws = NULL;
1183         protows->idcount = 0;
1184         pthread_mutex_unlock(&protows->mutex);
1185
1186         while (ncall) {
1187                 call= ncall;
1188                 ncall = call->next;
1189                 protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up");
1190                 free(call);
1191         }
1192
1193         while (ncd) {
1194                 cd= ncd;
1195                 ncd = cd->next;
1196                 cd->callback(cd->closure, NULL);
1197                 free(cd);
1198         }
1199
1200         if (ws) {
1201                 afb_ws_destroy(ws);
1202                 if (protows->on_hangup)
1203                         protows->on_hangup(protows->closure);
1204         }
1205 }
1206
1207 /*****************************************************/
1208
1209 static const struct afb_ws_itf proto_ws_client_ws_itf =
1210 {
1211         .on_close = NULL,
1212         .on_text = NULL,
1213         .on_binary = client_on_binary,
1214         .on_error = NULL,
1215         .on_hangup = on_hangup
1216 };
1217
1218 static const struct afb_ws_itf server_ws_itf =
1219 {
1220         .on_close = NULL,
1221         .on_text = NULL,
1222         .on_binary = server_on_binary,
1223         .on_error = NULL,
1224         .on_hangup = on_hangup
1225 };
1226
1227 /*****************************************************/
1228
1229 static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
1230 {
1231         struct afb_proto_ws *protows;
1232
1233         protows = calloc(1, sizeof *protows);
1234         if (protows == NULL)
1235                 errno = ENOMEM;
1236         else {
1237                 fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC);
1238                 fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK);
1239                 protows->ws = afb_ws_create(fdev, itf, protows);
1240                 if (protows->ws != NULL) {
1241                         protows->refcount = 1;
1242                         protows->version = WSAPI_VERSION_UNSET;
1243                         protows->closure = closure;
1244                         protows->server_itf = itfs;
1245                         protows->client_itf = itfc;
1246                         pthread_mutex_init(&protows->mutex, NULL);
1247                         return protows;
1248                 }
1249                 free(protows);
1250         }
1251         return NULL;
1252 }
1253
1254 struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
1255 {
1256         struct afb_proto_ws *protows;
1257
1258         protows = afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
1259         if (protows) {
1260                 if (send_version_offer_1(protows, WSAPI_VERSION_1) != 0) {
1261                         afb_proto_ws_unref(protows);
1262                         protows = NULL;
1263                 }
1264         }
1265         return protows;
1266 }
1267
1268 struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)
1269 {
1270         return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf);
1271 }
1272
1273 void afb_proto_ws_unref(struct afb_proto_ws *protows)
1274 {
1275         if (protows && !__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) {
1276                 afb_proto_ws_hangup(protows);
1277                 pthread_mutex_destroy(&protows->mutex);
1278                 free(protows);
1279         }
1280 }
1281
1282 void afb_proto_ws_addref(struct afb_proto_ws *protows)
1283 {
1284         __atomic_add_fetch(&protows->refcount, 1, __ATOMIC_RELAXED);
1285 }
1286
1287 int afb_proto_ws_is_client(struct afb_proto_ws *protows)
1288 {
1289         return !!protows->client_itf;
1290 }
1291
1292 int afb_proto_ws_is_server(struct afb_proto_ws *protows)
1293 {
1294         return !!protows->server_itf;
1295 }
1296
1297 void afb_proto_ws_hangup(struct afb_proto_ws *protows)
1298 {
1299         if (protows->ws)
1300                 afb_ws_hangup(protows->ws);
1301 }
1302
1303 void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure))
1304 {
1305         protows->on_hangup = on_hangup;
1306 }
1307
1308 void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws*, void (*)(int,void*), void*))
1309 {
1310         protows->queuing = queuing;
1311 }