X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?p=src%2Fapp-framework-binder.git;a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=0e102308b19bd8afd19c4e8f994dfa11082f3e52;hp=ce7d75d3151916565954d994fb108be7fe7fb71f;hb=65353dce81a629e042800bb7b86fcd869a76727e;hpb=ba1f3e26cb5f5f3e95480cb5c6a519a87c4c5d88 diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index ce7d75d3..0e102308 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015, 2016, 2017 "IoT.bzh" + * Copyright (C) 2015-2020 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,8 +17,6 @@ #define _GNU_SOURCE -#define NO_PLUGIN_VERBOSE_MACRO - #include #include #include @@ -37,14 +35,15 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" -#include "jobs.h" +#include "fdev.h" +#include "verbose.h" struct afb_proto_ws; /******** implementation of internal binder protocol per api **************/ /* -This protocol is asymetric: there is a client and a server +This protocol is asymmetric: there is a client and a server The client can require the following actions: @@ -60,76 +59,70 @@ The server must reply to the previous actions by The server can also within the context of a call - - make a subcall - - subscribe or unsubscribe an event For the purpose of handling events the server can: - create/destroy an event - - push or brodcast data as an event + - push or broadcast data as an event + + - signal unexpected event */ /************** constants for protocol definition *************************/ -#define CHAR_FOR_CALL 'C' -#define CHAR_FOR_ANSWER_SUCCESS 'T' -#define CHAR_FOR_ANSWER_FAIL 'F' -#define CHAR_FOR_EVT_BROADCAST '*' -#define CHAR_FOR_EVT_ADD '+' -#define CHAR_FOR_EVT_DEL '-' -#define CHAR_FOR_EVT_PUSH '!' -#define CHAR_FOR_EVT_SUBSCRIBE 'S' -#define CHAR_FOR_EVT_UNSUBSCRIBE 'U' -#define CHAR_FOR_SUBCALL_CALL 'B' -#define CHAR_FOR_SUBCALL_REPLY 'R' -#define CHAR_FOR_DESCRIBE 'D' -#define CHAR_FOR_DESCRIPTION 'd' - -/******************* handling subcalls *****************************/ - -/** - * Structure on server side for recording pending - * subcalls. - */ -struct server_subcall -{ - struct server_subcall *next; /**< next subcall for the client */ - uint32_t subcallid; /**< the subcallid */ - void (*callback)(void*, int, struct json_object*); /**< callback on completion */ - void *closure; /**< closure of the callback */ -}; +#define CHAR_FOR_CALL 'K' /* client -> server */ +#define CHAR_FOR_REPLY 'k' /* server -> client */ +#define CHAR_FOR_EVT_BROADCAST 'B' /* server -> client */ +#define CHAR_FOR_EVT_ADD 'E' /* server -> client */ +#define CHAR_FOR_EVT_DEL 'e' /* server -> client */ +#define CHAR_FOR_EVT_PUSH 'P' /* server -> client */ +#define CHAR_FOR_EVT_SUBSCRIBE 'X' /* server -> client */ +#define CHAR_FOR_EVT_UNSUBSCRIBE 'x' /* server -> client */ +#define CHAR_FOR_EVT_UNEXPECTED 'U' /* client -> server */ +#define CHAR_FOR_DESCRIBE 'D' /* client -> server */ +#define CHAR_FOR_DESCRIPTION 'd' /* server -> client */ +#define CHAR_FOR_TOKEN_ADD 'T' /* client -> server */ +#define CHAR_FOR_TOKEN_DROP 't' /* client -> server */ +#define CHAR_FOR_SESSION_ADD 'S' /* client -> server */ +#define CHAR_FOR_SESSION_DROP 's' /* client -> server */ +#define CHAR_FOR_VERSION_OFFER 'V' /* client -> server */ +#define CHAR_FOR_VERSION_SET 'v' /* server -> client */ -/** - * Structure for sending back replies on client side - */ -struct afb_proto_ws_subcall -{ - struct afb_proto_ws *protows; /**< proto descriptor */ - void *buffer; - uint32_t subcallid; /**< subcallid for the reply */ -}; +/******************* manage versions *****************************/ + +#define WSAPI_IDENTIFIER 02723012011 /* wsapi: 23.19.1.16.9 */ + +#define WSAPI_VERSION_UNSET 0 +#define WSAPI_VERSION_1 1 + +#define WSAPI_VERSION_MIN WSAPI_VERSION_1 +#define WSAPI_VERSION_MAX WSAPI_VERSION_1 + +/******************* maximum count of ids ***********************/ + +#define ACTIVE_ID_MAX 4095 + +/******************* handling calls *****************************/ /* * structure for recording calls on client side */ struct client_call { struct client_call *next; /* the next call */ - struct afb_proto_ws *protows; /* the proto_ws */ - void *request; - uint32_t callid; /* the message identifier */ + void *request; /* the request closure */ + uint16_t callid; /* the message identifier */ }; /* * structure for a ws request */ struct afb_proto_ws_call { - struct client_call *next; /* the next call */ struct afb_proto_ws *protows; /* the client of the request */ - uint32_t refcount; /* reference count */ - uint32_t callid; /* the incoming request callid */ char *buffer; /* the incoming buffer */ + uint16_t refcount; /* reference count */ + uint16_t callid; /* the incoming request callid */ }; /* @@ -138,10 +131,9 @@ struct afb_proto_ws_call { struct client_describe { struct client_describe *next; - struct afb_proto_ws *protows; void (*callback)(void*, struct json_object*); void *closure; - uint32_t descid; + uint16_t descid; }; /* @@ -150,7 +142,7 @@ struct client_describe struct afb_proto_ws_describe { struct afb_proto_ws *protows; - uint32_t descid; + uint16_t descid; }; /******************* proto description for client or servers ******************/ @@ -158,10 +150,16 @@ struct afb_proto_ws_describe struct afb_proto_ws { /* count of references */ - int refcount; + uint16_t refcount; + + /* id generator */ + uint16_t genid; + + /* count actives ids */ + uint16_t idcount; - /* file descriptor */ - int fd; + /* version */ + uint8_t version; /* resource control */ pthread_mutex_t mutex; @@ -181,24 +179,26 @@ struct afb_proto_ws /* emitted calls (client side) */ struct client_call *calls; - /* pending subcalls (server side) */ - struct server_subcall *subcalls; - /* pending description (client side) */ struct client_describe *describes; /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(struct afb_proto_ws *proto, void (*process)(int s, void *c), void *closure); }; /******************* streaming objects **********************************/ -#define WRITEBUF_COUNT_MAX 32 +#define WRITEBUF_COUNT_MAX 32 +#define WRITEBUF_BUFSZ (WRITEBUF_COUNT_MAX * sizeof(uint32_t)) + struct writebuf { + int iovcount, bufcount; struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; + char buf[WRITEBUF_BUFSZ]; }; struct readbuf @@ -212,19 +212,7 @@ struct binary struct readbuf rb; }; -/******************* common useful tools **********************************/ - -/** - * translate a pointer to some integer - * @param ptr the pointer to translate - * @return an integer - */ -static inline uint32_t ptr2id(void *ptr) -{ - return (uint32_t)(((intptr_t)ptr) >> 6); -} - -/******************* serialisation part **********************************/ +/******************* serialization part **********************************/ static char *readbuf_get(struct readbuf *rb, uint32_t length) { @@ -236,43 +224,80 @@ static char *readbuf_get(struct readbuf *rb, uint32_t length) return before; } -static int readbuf_char(struct readbuf *rb, char *value) +static int readbuf_getat(struct readbuf *rb, void *to, uint32_t length) { - if (rb->head >= rb->end) + char *head = readbuf_get(rb, length); + if (!head) return 0; - *value = *rb->head++; + memcpy(to, head, length); return 1; } +__attribute__((unused)) +static int readbuf_char(struct readbuf *rb, char *value) +{ + return readbuf_getat(rb, value, sizeof *value); +} + static int readbuf_uint32(struct readbuf *rb, uint32_t *value) { - char *after = rb->head + sizeof *value; - if (after > rb->end) - return 0; - memcpy(value, rb->head, sizeof *value); - rb->head = after; - *value = le32toh(*value); - return 1; + int r = readbuf_getat(rb, value, sizeof *value); + if (r) + *value = le32toh(*value); + return r; } -static int readbuf_string(struct readbuf *rb, const char **value, size_t *length) +static int readbuf_uint16(struct readbuf *rb, uint16_t *value) +{ + int r = readbuf_getat(rb, value, sizeof *value); + if (r) + *value = le16toh(*value); + return r; +} + +static int readbuf_uint8(struct readbuf *rb, uint8_t *value) +{ + return readbuf_getat(rb, value, sizeof *value); +} + +static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok) { uint32_t len; - if (!readbuf_uint32(rb, &len) || !len) + if (!readbuf_uint32(rb, &len)) return 0; + if (!len) { + if (!nulok) + return 0; + *value = NULL; + if (length) + *length = 0; + return 1; + } if (length) *length = (size_t)(len - 1); return (*value = readbuf_get(rb, len)) != NULL && rb->head[-1] == 0; } + +static int readbuf_string(struct readbuf *rb, const char **value, size_t *length) +{ + return _readbuf_string_(rb, value, length, 0); +} + +static int readbuf_nullstring(struct readbuf *rb, const char **value, size_t *length) +{ + return _readbuf_string_(rb, value, length, 1); +} + static int readbuf_object(struct readbuf *rb, struct json_object **object) { const char *string; struct json_object *o; + enum json_tokener_error jerr; int rc = readbuf_string(rb, &string, NULL); if (rc) { - o = json_tokener_parse(string); - if (o == NULL && strcmp(string, "null")) + o = json_tokener_parse_verbose(string, &jerr); + if (jerr != json_tokener_success) o = json_object_new_string(string); *object = o; } @@ -281,37 +306,66 @@ static int readbuf_object(struct readbuf *rb, struct json_object **object) static int writebuf_put(struct writebuf *wb, const void *value, size_t length) { - int i = wb->count; + int i = wb->iovcount; if (i == WRITEBUF_COUNT_MAX) return 0; wb->iovec[i].iov_base = (void*)value; wb->iovec[i].iov_len = length; - wb->count = i + 1; + wb->iovcount = i + 1; return 1; } -static int writebuf_char(struct writebuf *wb, char value) +static int writebuf_putbuf(struct writebuf *wb, const void *value, int length) { - int i = wb->count; - if (i == WRITEBUF_COUNT_MAX) + char *p; + int i = wb->iovcount, n = wb->bufcount, nafter; + + /* check enough length */ + nafter = n + length; + if (nafter > WRITEBUF_BUFSZ) + return 0; + + /* get where to store */ + p = &wb->buf[n]; + if (i && p == (((char*)wb->iovec[i - 1].iov_base) + wb->iovec[i - 1].iov_len)) + /* increase previous iovec */ + wb->iovec[i - 1].iov_len += (size_t)length; + else if (i == WRITEBUF_COUNT_MAX) + /* no more iovec */ return 0; - *(char*)&wb->uints[i] = value; - wb->iovec[i].iov_base = &wb->uints[i]; - wb->iovec[i].iov_len = 1; - wb->count = i + 1; + else { + /* new iovec */ + wb->iovec[i].iov_base = p; + wb->iovec[i].iov_len = (size_t)length; + wb->iovcount = i + 1; + } + /* store now */ + memcpy(p, value, (size_t)length); + wb->bufcount = nafter; return 1; } +__attribute__((unused)) +static int writebuf_char(struct writebuf *wb, char value) +{ + return writebuf_putbuf(wb, &value, 1); +} + static int writebuf_uint32(struct writebuf *wb, uint32_t value) { - int i = wb->count; - if (i == WRITEBUF_COUNT_MAX) - return 0; - wb->uints[i] = htole32(value); - wb->iovec[i].iov_base = &wb->uints[i]; - wb->iovec[i].iov_len = sizeof wb->uints[i]; - wb->count = i + 1; - return 1; + value = htole32(value); + return writebuf_putbuf(wb, &value, (int)sizeof value); +} + +static int writebuf_uint16(struct writebuf *wb, uint16_t value) +{ + value = htole16(value); + return writebuf_putbuf(wb, &value, (int)sizeof value); +} + +static int writebuf_uint8(struct writebuf *wb, uint8_t value) +{ + return writebuf_putbuf(wb, &value, (int)sizeof value); } static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length) @@ -325,166 +379,150 @@ static int writebuf_string(struct writebuf *wb, const char *value) return writebuf_string_length(wb, value, strlen(value)); } +static int writebuf_nullstring(struct writebuf *wb, const char *value) +{ + return value ? writebuf_string_length(wb, value, strlen(value)) : writebuf_uint32(wb, 0); +} + static int writebuf_object(struct writebuf *wb, struct json_object *object) { const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); return string != NULL && writebuf_string(wb, string); } -/******************* ws request part for server *****************/ +/******************* queuing of messages *****************/ -void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) { - __atomic_add_fetch(&call->refcount, 1, __ATOMIC_RELAXED); + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(protows, processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); } -void afb_proto_ws_call_unref(struct afb_proto_ws_call *call) +/******************* sending messages *****************/ + +static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb) { - if (__atomic_sub_fetch(&call->refcount, 1, __ATOMIC_RELAXED)) - return; + int rc; + struct afb_ws *ws; - afb_proto_ws_unref(call->protows); - free(call->buffer); - free(call); + pthread_mutex_lock(&protows->mutex); + ws = protows->ws; + if (ws == NULL) { + errno = EPIPE; + rc = -1; + } else { + rc = afb_ws_binary_v(ws, wb->iovec, wb->iovcount); + if (rc > 0) + rc = 0; + } + pthread_mutex_unlock(&protows->mutex); + return rc; } -int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object *obj, const char *info) +static int send_version_offer_1(struct afb_proto_ws *protows, uint8_t version) { int rc = -1; - struct writebuf wb = { .count = 0 }; - struct afb_proto_ws *protows = call->protows; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; - if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS) - && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, info ?: "") - && writebuf_object(&wb, obj)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + if (writebuf_char(&wb, CHAR_FOR_VERSION_OFFER) + && writebuf_uint32(&wb, WSAPI_IDENTIFIER) + && writebuf_uint8(&wb, 1) /* offer one version */ + && writebuf_uint8(&wb, version)) + rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, const char *info) +static int send_version_set(struct afb_proto_ws *protows, uint8_t version) { int rc = -1; - struct writebuf wb = { .count = 0 }; - struct afb_proto_ws *protows = call->protows; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; - if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL) - && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, status) - && writebuf_string(&wb, info ? : "")) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + if (writebuf_char(&wb, CHAR_FOR_VERSION_SET) + && writebuf_uint8(&wb, version)) + rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) +/******************* ws request part for server *****************/ + +void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) { - int rc = -1; - struct writebuf wb = { .count = 0 }; - struct server_subcall *sc, *osc; - struct afb_proto_ws *protows = call->protows; + __atomic_add_fetch(&call->refcount, 1, __ATOMIC_RELAXED); +} - sc = malloc(sizeof *sc); - if (!sc) - errno = ENOMEM; - else { - sc->callback = callback; - sc->closure = cb_closure; +void afb_proto_ws_call_unref(struct afb_proto_ws_call *call) +{ + if (__atomic_sub_fetch(&call->refcount, 1, __ATOMIC_RELAXED)) + return; - pthread_mutex_lock(&protows->mutex); - sc->subcallid = ptr2id(sc); - do { - sc->subcallid++; - osc = protows->subcalls; - while(osc && osc->subcallid != sc->subcallid) - osc = osc->next; - } while (osc); - sc->next = protows->subcalls; - protows->subcalls = sc; - pthread_mutex_unlock(&protows->mutex); + afb_proto_ws_unref(call->protows); + free(call->buffer); + free(call); +} - if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL) - && writebuf_uint32(&wb, sc->subcallid) - && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, api) - && writebuf_string(&wb, verb) - && writebuf_object(&wb, args)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } - } -success: +int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info) +{ + int rc = -1; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + struct afb_proto_ws *protows = call->protows; + + if (writebuf_char(&wb, CHAR_FOR_REPLY) + && writebuf_uint16(&wb, call->callid) + && writebuf_nullstring(&wb, error) + && writebuf_nullstring(&wb, info) + && writebuf_object(&wb, obj)) + rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) +int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, uint16_t event_id) { int rc = -1; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) - && writebuf_uint32(&wb, call->callid) - && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + && writebuf_uint16(&wb, call->callid) + && writebuf_uint16(&wb, event_id)) + rc = proto_write(protows, &wb); return rc; } -int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) +int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, uint16_t event_id) { int rc = -1; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) - && writebuf_uint32(&wb, call->callid) - && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + && writebuf_uint16(&wb, call->callid) + && writebuf_uint16(&wb, event_id)) + rc = proto_write(protows, &wb); return rc; } /******************* client part **********************************/ /* search a memorized call */ -static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint16_t callid) { struct client_call *call; @@ -495,7 +533,7 @@ static struct client_call *client_call_search_locked(struct afb_proto_ws *protow return call; } -static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint16_t callid) { struct client_call *result; @@ -506,217 +544,128 @@ static struct client_call *client_call_search_unlocked(struct afb_proto_ws *prot } /* free and release the memorizing call */ -static void client_call_destroy(struct client_call *call) +static void client_call_destroy(struct afb_proto_ws *protows, struct client_call *call) { struct client_call **prv; - struct afb_proto_ws *protows = call->protows; pthread_mutex_lock(&protows->mutex); - prv = &call->protows->calls; + prv = &protows->calls; while (*prv != NULL) { if (*prv == call) { + protows->idcount--; *prv = call->next; - break; + pthread_mutex_unlock(&protows->mutex); + free(call); + return; } prv = &(*prv)->next; } pthread_mutex_unlock(&protows->mutex); - free(call); -} - -/* get event data from the message */ -static int client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name) -{ - return readbuf_uint32(rb, eventid) && readbuf_string(rb, name, NULL); } /* get event from the message */ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, struct client_call **call) { - uint32_t callid; + uint16_t callid; /* get event data from the message */ - if (!readbuf_uint32(rb, &callid)) { + if (!readbuf_uint16(rb, &callid)) return 0; - } /* get the call */ *call = client_call_search_unlocked(protows, callid); - if (*call == NULL) { - return 0; - } - - return 1; + return *call != NULL; } /* adds an event */ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf *rb) { const char *event_name; - uint32_t event_id; - - if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id); + uint16_t event_id; + + if (protows->client_itf->on_event_create + && readbuf_uint16(rb, &event_id) + && readbuf_string(rb, &event_name, NULL)) + protows->client_itf->on_event_create(protows->closure, event_id, event_name); + else + ERROR("Ignoring creation of event"); } /* removes an event */ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; - if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id); + if (protows->client_itf->on_event_remove && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_remove(protows->closure, event_id); + else + ERROR("Ignoring deletion of event"); } /* subscribes an event */ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct client_call *call; - if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id); + if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_subscribe(protows->closure, call->request, event_id); + else + ERROR("Ignoring subscription to event"); } /* unsubscribes an event */ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct client_call *call; - if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) - protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id); + if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && readbuf_uint16(rb, &event_id)) + protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_id); + else + ERROR("Ignoring unsubscription to event"); } /* receives broadcasted events */ static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; + const char *event_name, *uuid; + uint8_t hop; struct json_object *object; - if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object)) - protows->client_itf->on_event_broadcast(protows->closure, event_name, object); + if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_uint8(rb, &hop)) + protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, hop); + else + ERROR("Ignoring broadcast of event"); } /* pushs an event */ static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; - uint32_t event_id; + uint16_t event_id; struct json_object *object; - if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object)) - protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object); + if (protows->client_itf->on_event_push && readbuf_uint16(rb, &event_id) && readbuf_object(rb, &object)) + protows->client_itf->on_event_push(protows->closure, event_id, object); + else + ERROR("Ignoring push of event"); } -static void client_on_reply_success(struct afb_proto_ws *protows, struct readbuf *rb) +static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb) { struct client_call *call; struct json_object *object; - const char *info; - - if (!client_msg_call_get(protows, rb, &call)) - return; - - if (readbuf_string(rb, &info, NULL) && readbuf_object(rb, &object)) { - protows->client_itf->on_reply_success(protows->closure, call->request, object, info); - } else { - protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process success"); - } - client_call_destroy(call); -} - -static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *rb) -{ - struct client_call *call; - const char *info, *status; + const char *error, *info; if (!client_msg_call_get(protows, rb, &call)) return; - - if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) { - protows->client_itf->on_reply_fail(protows->closure, call->request, status, info); + if (readbuf_nullstring(rb, &error, NULL) && readbuf_nullstring(rb, &info, NULL) && readbuf_object(rb, &object)) { + protows->client_itf->on_reply(protows->closure, call->request, object, error, info); } else { - protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process fail"); - } - client_call_destroy(call); -} - -/* send a subcall reply */ -static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subcallid, int status, json_object *object) -{ - struct writebuf wb = { .count = 0 }; - char ie = status < 0; - int rc; - - if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) - && writebuf_uint32(&wb, subcallid) - && writebuf_char(&wb, ie) - && writebuf_object(&wb, object)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; -} - -/* callback for subcall reply */ -int afb_proto_ws_subcall_reply(struct afb_proto_ws_subcall *subcall, int status, struct json_object *result) -{ - int rc = client_send_subcall_reply(subcall->protows, subcall->subcallid, status, result); - afb_proto_ws_unref(subcall->protows); - free(subcall->buffer); - free(subcall); - return rc; -} - -/* received a subcall request */ -static void client_on_subcall(struct afb_proto_ws *protows, struct readbuf *rb) -{ - struct afb_proto_ws_subcall *subcall; - struct client_call *call; - const char *api, *verb; - uint32_t subcallid; - struct json_object *object; - - /* get the subcallid */ - if (!readbuf_uint32(rb, &subcallid)) - return; - - /* if not expected drop it */ - if (!protows->client_itf->on_subcall) - goto error; - - /* retrieve the message data */ - if (!client_msg_call_get(protows, rb, &call)) - goto error; - - /* allocation of the subcall */ - subcall = malloc(sizeof *subcall); - if (!subcall) - goto error; - - /* make the call */ - if (readbuf_string(rb, &api, NULL) - && readbuf_string(rb, &verb, NULL) - && readbuf_object(rb, &object)) { - afb_proto_ws_addref(protows); - subcall->protows = protows; - subcall->subcallid = subcallid; - subcall->buffer = rb->base; - rb->base = NULL; - protows->client_itf->on_subcall(protows->closure, subcall, call->request, api, verb, object); - return; + protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success"); } - free(subcall); -error: - client_send_subcall_reply(protows, subcallid, 1, NULL); + client_call_destroy(protows, call); } static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb) @@ -734,6 +683,7 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * pthread_mutex_unlock(&protows->mutex); else { *prv = desc->next; + protows->idcount--; pthread_mutex_unlock(&protows->mutex); if (!readbuf_object(rb, &object)) object = NULL; @@ -743,6 +693,22 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } } +/* received a version set */ +static void client_on_version_set(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint8_t version; + + /* reads the descid */ + if (readbuf_uint8(rb, &version) + && WSAPI_VERSION_MIN <= version + && version <= WSAPI_VERSION_MAX) { + protows->version = version; + return; + } + afb_proto_ws_hangup(protows); +} + + /* callback when receiving binary data */ static void client_on_binary_job(int sig, void *closure) { @@ -750,11 +716,8 @@ static void client_on_binary_job(int sig, void *closure) if (!sig) { switch (*binary->rb.head++) { - case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(binary->protows, &binary->rb); - break; - case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(binary->protows, &binary->rb); + case CHAR_FOR_REPLY: /* reply */ + client_on_reply(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ client_on_event_broadcast(binary->protows, &binary->rb); @@ -774,12 +737,12 @@ static void client_on_binary_job(int sig, void *closure) case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ client_on_event_unsubscribe(binary->protows, &binary->rb); break; - case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(binary->protows, &binary->rb); - break; case CHAR_FOR_DESCRIPTION: /* description */ client_on_description(binary->protows, &binary->rb); break; + case CHAR_FOR_VERSION_SET: /* set the protocol version */ + client_on_version_set(binary->protows, &binary->rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -792,38 +755,63 @@ static void client_on_binary_job(int sig, void *closure) /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, client_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, client_on_binary_job); +} + +static int client_send_cmd_id16_optstr(struct afb_proto_ws *protows, char order, uint16_t id, const char *value) +{ + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; + + if (writebuf_char(&wb, order) + && writebuf_uint16(&wb, id) + && (!value || writebuf_string(&wb, value))) + rc = proto_write(protows, &wb); + return rc; +} + +int afb_proto_ws_client_session_create(struct afb_proto_ws *protows, uint16_t sessionid, const char *sessionstr) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_SESSION_ADD, sessionid, sessionstr); +} + +int afb_proto_ws_client_session_remove(struct afb_proto_ws *protows, uint16_t sessionid) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_SESSION_DROP, sessionid, NULL); +} + +int afb_proto_ws_client_token_create(struct afb_proto_ws *protows, uint16_t tokenid, const char *tokenstr) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_ADD, tokenid, tokenstr); + +} + +int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL); +} + +int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid) +{ + return client_send_cmd_id16_optstr(protows, CHAR_FOR_EVT_UNEXPECTED, eventid, NULL); } int afb_proto_ws_client_call( struct afb_proto_ws *protows, const char *verb, struct json_object *args, - const char *sessionid, - void *request + uint16_t sessionid, + uint16_t tokenid, + void *request, + const char *user_creds ) { int rc = -1; struct client_call *call; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + uint16_t id; /* allocate call data */ call = malloc(sizeof *call); @@ -835,35 +823,39 @@ int afb_proto_ws_client_call( /* init call data */ pthread_mutex_lock(&protows->mutex); - call->callid = ptr2id(call); - while(client_call_search_locked(protows, call->callid) != NULL) - call->callid++; - call->protows = protows; + if (protows->idcount >= ACTIVE_ID_MAX) { + pthread_mutex_unlock(&protows->mutex); + errno = EBUSY; + goto clean; + } + protows->idcount++; + id = ++protows->genid; + while(!id || client_call_search_locked(protows, id) != NULL) + id++; + call->callid = protows->genid = id; call->next = protows->calls; protows->calls = call; pthread_mutex_unlock(&protows->mutex); /* creates the call message */ if (!writebuf_char(&wb, CHAR_FOR_CALL) - || !writebuf_uint32(&wb, call->callid) + || !writebuf_uint16(&wb, call->callid) || !writebuf_string(&wb, verb) - || !writebuf_string(&wb, sessionid) - || !writebuf_object(&wb, args)) { + || !writebuf_uint16(&wb, sessionid) + || !writebuf_uint16(&wb, tokenid) + || !writebuf_object(&wb, args) + || !writebuf_nullstring(&wb, user_creds)) { errno = EINVAL; goto clean; } /* send */ - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; + rc = proto_write(protows, &wb); + if (!rc) goto end; - } clean: - client_call_destroy(call); + client_call_destroy(protows, call); end: return rc; } @@ -872,7 +864,8 @@ end: int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure) { struct client_describe *desc, *d; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + uint16_t id; desc = malloc(sizeof *desc); if (!desc) { @@ -882,30 +875,40 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( /* fill in stack the description of the task */ pthread_mutex_lock(&protows->mutex); - desc->descid = ptr2id(desc); + if (protows->idcount >= ACTIVE_ID_MAX) { + errno = EBUSY; + goto busy; + } + protows->idcount++; + id = ++protows->genid; d = protows->describes; while (d) { - if (d->descid != desc->descid) + if (id && d->descid != id) d = d->next; else { - desc->descid++; + id++; d = protows->describes; } } + desc->descid = protows->genid = id; desc->callback = callback; desc->closure = closure; - desc->protows = protows; desc->next = protows->describes; protows->describes = desc; + pthread_mutex_unlock(&protows->mutex); /* send */ - if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) - && writebuf_uint32(&wb, desc->descid) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) { - pthread_mutex_unlock(&protows->mutex); - return 0; + if (!writebuf_char(&wb, CHAR_FOR_DESCRIBE) + || !writebuf_uint16(&wb, desc->descid)) { + errno = EINVAL; + goto error2; } + if (proto_write(protows, &wb) == 0) + return 0; + +error2: + pthread_mutex_lock(&protows->mutex); d = protows->describes; if (d == desc) protows->describes = desc->next; @@ -915,6 +918,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( if (d) d->next = desc->next; } + protows->idcount--; +busy: pthread_mutex_unlock(&protows->mutex); free(desc); error: @@ -928,18 +933,20 @@ error: static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) { struct afb_proto_ws_call *call; - const char *uuid, *verb; - uint32_t callid; + const char *verb, *user_creds; + uint16_t callid, sessionid, tokenid; size_t lenverb; struct json_object *object; afb_proto_ws_addref(protows); /* reads the call message data */ - if (!readbuf_uint32(rb, &callid) + if (!readbuf_uint16(rb, &callid) || !readbuf_string(rb, &verb, &lenverb) - || !readbuf_string(rb, &uuid, NULL) - || !readbuf_object(rb, &object)) + || !readbuf_uint16(rb, &sessionid) + || !readbuf_uint16(rb, &tokenid) + || !readbuf_object(rb, &object) + || !readbuf_nullstring(rb, &user_creds, NULL)) goto overflow; /* create the request */ @@ -953,7 +960,7 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) call->buffer = rb->base; rb->base = NULL; /* don't free the buffer */ - protows->server_itf->on_call(protows->closure, call, verb, object, uuid); + protows->server_itf->on_call(protows->closure, call, verb, object, sessionid, tokenid, user_creds); return; out_of_memory: @@ -963,54 +970,16 @@ overflow: afb_proto_ws_unref(protows); } -/* on subcall reply */ -static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf *rb) -{ - char ie; - uint32_t subcallid; - struct json_object *object; - struct server_subcall *sc, **psc; - - /* reads the call message data */ - if (!readbuf_uint32(rb, &subcallid) - || !readbuf_char(rb, &ie) - || !readbuf_object(rb, &object)) { - /* TODO bad protocol */ - return; - } - - /* search the subcall and unlink it */ - pthread_mutex_lock(&protows->mutex); - psc = &protows->subcalls; - while ((sc = *psc) && sc->subcallid != subcallid) - psc = &sc->next; - if (!sc) { - pthread_mutex_unlock(&protows->mutex); - json_object_put(object); - /* TODO subcall not found */ - } else { - *psc = sc->next; - pthread_mutex_unlock(&protows->mutex); - sc->callback(sc->closure, -(int)ie, object); - free(sc); - } -} - static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { - int rc; - struct writebuf wb = { .count = 0 }; + int rc = -1; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION) && writebuf_uint32(&wb, descid) - && writebuf_object(&wb, descobj)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; + && writebuf_object(&wb, descobj)) + rc = proto_write(protows, &wb); + return rc; } int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description) @@ -1024,11 +993,11 @@ int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct jso /* on describe, propagate it to the ws service */ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) { - uint32_t descid; + uint16_t descid; struct afb_proto_ws_describe *desc; /* reads the descid */ - if (readbuf_uint32(rb, &descid)) { + if (readbuf_uint16(rb, &descid)) { if (protows->server_itf->on_describe) { /* create asynchronous job */ desc = malloc(sizeof *desc); @@ -1044,6 +1013,81 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } } +static void server_on_session_add(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t sessionid; + const char *sessionstr; + + if (readbuf_uint16(rb, &sessionid) && readbuf_string(rb, &sessionstr, NULL)) + protows->server_itf->on_session_create(protows->closure, sessionid, sessionstr); +} + +static void server_on_session_drop(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t sessionid; + + if (readbuf_uint16(rb, &sessionid)) + protows->server_itf->on_session_remove(protows->closure, sessionid); +} + +static void server_on_token_add(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t tokenid; + const char *tokenstr; + + if (readbuf_uint16(rb, &tokenid) && readbuf_string(rb, &tokenstr, NULL)) + protows->server_itf->on_token_create(protows->closure, tokenid, tokenstr); +} + +static void server_on_token_drop(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t tokenid; + + if (readbuf_uint16(rb, &tokenid)) + protows->server_itf->on_token_remove(protows->closure, tokenid); +} + +static void server_on_event_unexpected(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint16_t eventid; + + if (readbuf_uint16(rb, &eventid)) + protows->server_itf->on_event_unexpected(protows->closure, eventid); +} + +/* on version offer */ +static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb) +{ + uint8_t count; + uint8_t *versions; + uint8_t version; + uint8_t v; + uint32_t id; + + /* reads the descid */ + if (readbuf_uint32(rb, &id) + && id == WSAPI_IDENTIFIER + && readbuf_uint8(rb, &count) + && count > 0 + && (versions = (uint8_t*)readbuf_get(rb, (uint32_t)count))) { + version = WSAPI_VERSION_UNSET; + while (count) { + v = versions[--count]; + if (v >= WSAPI_VERSION_MIN + && v <= WSAPI_VERSION_MAX + && (version == WSAPI_VERSION_UNSET || version < v)) + version = v; + } + if (version != WSAPI_VERSION_UNSET) { + if (send_version_set(protows, version) >= 0) { + protows->version = version; + return; + } + } + } + afb_proto_ws_hangup(protows); +} + /* callback when receiving binary data */ static void server_on_binary_job(int sig, void *closure) { @@ -1054,12 +1098,27 @@ static void server_on_binary_job(int sig, void *closure) case CHAR_FOR_CALL: server_on_call(binary->protows, &binary->rb); break; - case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(binary->protows, &binary->rb); - break; case CHAR_FOR_DESCRIBE: server_on_describe(binary->protows, &binary->rb); break; + case CHAR_FOR_SESSION_ADD: + server_on_session_add(binary->protows, &binary->rb); + break; + case CHAR_FOR_SESSION_DROP: + server_on_session_drop(binary->protows, &binary->rb); + break; + case CHAR_FOR_TOKEN_ADD: + server_on_token_add(binary->protows, &binary->rb); + break; + case CHAR_FOR_TOKEN_DROP: + server_on_token_drop(binary->protows, &binary->rb); + break; + case CHAR_FOR_EVT_UNEXPECTED: + server_on_event_unexpected(binary->protows, &binary->rb); + break; + case CHAR_FOR_VERSION_OFFER: + server_on_version_offer(binary->protows, &binary->rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1071,65 +1130,56 @@ static void server_on_binary_job(int sig, void *closure) static void server_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, server_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ -static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) +static int server_event_send(struct afb_proto_ws *protows, char order, uint16_t event_id, const char *event_name, struct json_object *data) { - struct writebuf wb = { .count = 0 }; - int rc; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; if (writebuf_char(&wb, order) - && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) - && writebuf_string(&wb, event_name) - && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; + && writebuf_uint16(&wb, event_id) + && (order != CHAR_FOR_EVT_ADD || writebuf_string(&wb, event_name)) + && (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data))) + rc = proto_write(protows, &wb); + return rc; } -int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) +int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name) { - return server_event_send(protows, CHAR_FOR_EVT_ADD, event_name, event_id, NULL); + return server_event_send(protows, CHAR_FOR_EVT_ADD, event_id, event_name, NULL); } -int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, const char *event_name, int event_id) +int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id) { - return server_event_send(protows, CHAR_FOR_EVT_DEL, event_name, event_id, NULL); + return server_event_send(protows, CHAR_FOR_EVT_DEL, event_id, NULL, NULL); } -int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *event_name, int event_id, struct json_object *data) +int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, uint16_t event_id, struct json_object *data) { - return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data); + return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_id, NULL, data); } -int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data) +int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop) { - return server_event_send(protows, CHAR_FOR_EVT_BROADCAST, event_name, 0, data); + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; + + if (!hop) + return 0; + + if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST) + && writebuf_string(&wb, event_name) + && writebuf_object(&wb, data) + && writebuf_put(&wb, uuid, 16) + && writebuf_uint8(&wb, (uint8_t)(hop - 1))) + rc = proto_write(protows, &wb); + return rc; } /*****************************************************/ @@ -1138,18 +1188,27 @@ int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char static void on_hangup(void *closure) { struct afb_proto_ws *protows = closure; - struct server_subcall *sc, *nsc; struct client_describe *cd, *ncd; + struct client_call *call, *ncall; + struct afb_ws *ws; + + pthread_mutex_lock(&protows->mutex); + ncd = protows->describes; + protows->describes = NULL; + ncall = protows->calls; + protows->calls = NULL; + ws = protows->ws; + protows->ws = NULL; + protows->idcount = 0; + pthread_mutex_unlock(&protows->mutex); - nsc = protows->subcalls; - while (nsc) { - sc= nsc; - nsc = sc->next; - sc->callback(sc->closure, 1, NULL); - free(sc); + while (ncall) { + call= ncall; + ncall = call->next; + protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up"); + free(call); } - ncd = protows->describes; while (ncd) { cd= ncd; ncd = cd->next; @@ -1157,9 +1216,8 @@ static void on_hangup(void *closure) free(cd); } - if (protows->fd >= 0) { - close(protows->fd); - protows->fd = -1; + if (ws) { + afb_ws_destroy(ws); if (protows->on_hangup) protows->on_hangup(protows->closure); } @@ -1187,7 +1245,7 @@ static const struct afb_ws_itf server_ws_itf = /*****************************************************/ -static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) +static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) { struct afb_proto_ws *protows; @@ -1195,13 +1253,12 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, if (protows == NULL) errno = ENOMEM; else { - fcntl(fd, F_SETFD, FD_CLOEXEC); - fcntl(fd, F_SETFL, O_NONBLOCK); - protows->ws = afb_ws_create(eloop, fd, itf, protows); + fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC); + fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK); + protows->ws = afb_ws_create(fdev, itf, protows); if (protows->ws != NULL) { - protows->fd = fd; protows->refcount = 1; - protows->subcalls = NULL; + protows->version = WSAPI_VERSION_UNSET; protows->closure = closure; protows->server_itf = itfs; protows->client_itf = itfc; @@ -1213,21 +1270,29 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, return NULL; } -struct afb_proto_ws *afb_proto_ws_create_client(struct sd_event *eloop, int fd, const struct afb_proto_ws_client_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure) { - return afb_proto_ws_create(eloop, fd, NULL, itf, closure, &proto_ws_client_ws_itf); + struct afb_proto_ws *protows; + + protows = afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf); + if (protows) { + if (send_version_offer_1(protows, WSAPI_VERSION_1) != 0) { + afb_proto_ws_unref(protows); + protows = NULL; + } + } + return protows; } -struct afb_proto_ws *afb_proto_ws_create_server(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure) { - return afb_proto_ws_create(eloop, fd, itf, NULL, closure, &server_ws_itf); + return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf); } void afb_proto_ws_unref(struct afb_proto_ws *protows) { - if (!__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { + if (protows && !__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { afb_proto_ws_hangup(protows); - afb_ws_destroy(protows->ws); pthread_mutex_destroy(&protows->mutex); free(protows); } @@ -1250,7 +1315,8 @@ int afb_proto_ws_is_server(struct afb_proto_ws *protows) void afb_proto_ws_hangup(struct afb_proto_ws *protows) { - afb_ws_hangup(protows->ws); + if (protows->ws) + afb_ws_hangup(protows->ws); } void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure)) @@ -1258,3 +1324,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws*, void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +}