X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=86112a323eea2349f0b7740c06af7be116e72920;hb=7b6940f1524cac6172e71529a989424ff18fb850;hp=90c3b05a05598742f6e8e659693161d67c57ec49;hpb=760f61ba4523d5c3461a0c30bbda78c84ab80103;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 90c3b05a..86112a32 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015, 2016, 2017 "IoT.bzh" + * Copyright (C) 2015-2018 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,8 +17,6 @@ #define _GNU_SOURCE -#define NO_PLUGIN_VERBOSE_MACRO - #include #include #include @@ -37,6 +35,8 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" +#include "jobs.h" +#include "fdev.h" struct afb_proto_ws; @@ -59,8 +59,6 @@ The server must reply to the previous actions by The server can also within the context of a call - - make a subcall - - subscribe or unsubscribe an event For the purpose of handling events the server can: @@ -73,42 +71,17 @@ For the purpose of handling events the server can: /************** constants for protocol definition *************************/ #define CHAR_FOR_CALL 'C' -#define CHAR_FOR_ANSWER_SUCCESS 'T' -#define CHAR_FOR_ANSWER_FAIL 'F' +#define CHAR_FOR_REPLY 'Y' #define CHAR_FOR_EVT_BROADCAST '*' #define CHAR_FOR_EVT_ADD '+' #define CHAR_FOR_EVT_DEL '-' #define CHAR_FOR_EVT_PUSH '!' #define CHAR_FOR_EVT_SUBSCRIBE 'S' #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' -#define CHAR_FOR_SUBCALL_CALL 'B' -#define CHAR_FOR_SUBCALL_REPLY 'R' #define CHAR_FOR_DESCRIBE 'D' #define CHAR_FOR_DESCRIPTION 'd' -/******************* handling subcalls *****************************/ - -/** - * Structure on server side for recording pending - * subcalls. - */ -struct server_subcall -{ - struct server_subcall *next; /**< next subcall for the client */ - uint32_t subcallid; /**< the subcallid */ - void (*callback)(void*, int, struct json_object*); /**< callback on completion */ - void *closure; /**< closure of the callback */ -}; - -/** - * Structure for sending back replies on client side - */ -struct afb_proto_ws_subcall -{ - struct afb_proto_ws *protows; /**< proto descriptor */ - void *buffer; - uint32_t subcallid; /**< subcallid for the reply */ -}; +/******************* handling calls *****************************/ /* * structure for recording calls on client side @@ -116,7 +89,7 @@ struct afb_proto_ws_subcall struct client_call { struct client_call *next; /* the next call */ struct afb_proto_ws *protows; /* the proto_ws */ - void *request; + void *request; /* the request closure */ uint32_t callid; /* the message identifier */ }; @@ -160,7 +133,7 @@ struct afb_proto_ws int refcount; /* file descriptor */ - int fd; + struct fdev *fdev; /* resource control */ pthread_mutex_t mutex; @@ -180,14 +153,35 @@ struct afb_proto_ws /* emitted calls (client side) */ struct client_call *calls; - /* pending subcalls (server side) */ - struct server_subcall *subcalls; - /* pending description (client side) */ struct client_describe *describes; /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(void (*process)(int s, void *c), void *closure); +}; + +/******************* streaming objects **********************************/ + +#define WRITEBUF_COUNT_MAX 32 +struct writebuf +{ + struct iovec iovec[WRITEBUF_COUNT_MAX]; + uint32_t uints[WRITEBUF_COUNT_MAX]; + int count; +}; + +struct readbuf +{ + char *base, *head, *end; +}; + +struct binary +{ + struct afb_proto_ws *protows; + struct readbuf rb; }; /******************* common useful tools **********************************/ @@ -204,19 +198,6 @@ static inline uint32_t ptr2id(void *ptr) /******************* serialisation part **********************************/ -struct readbuf -{ - char *base, *head, *end; -}; - -#define WRITEBUF_COUNT_MAX 32 -struct writebuf -{ - struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; -}; - static char *readbuf_get(struct readbuf *rb, uint32_t length) { char *before = rb->head; @@ -227,6 +208,7 @@ static char *readbuf_get(struct readbuf *rb, uint32_t length) return before; } +__attribute__((unused)) static int readbuf_char(struct readbuf *rb, char *value) { if (rb->head >= rb->end) @@ -246,24 +228,44 @@ static int readbuf_uint32(struct readbuf *rb, uint32_t *value) return 1; } -static int readbuf_string(struct readbuf *rb, const char **value, size_t *length) +static int _readbuf_string_(struct readbuf *rb, const char **value, size_t *length, int nulok) { uint32_t len; - if (!readbuf_uint32(rb, &len) || !len) + if (!readbuf_uint32(rb, &len)) return 0; + if (!len) { + if (!nulok) + return 0; + *value = NULL; + if (length) + *length = 0; + return 1; + } if (length) *length = (size_t)(len - 1); return (*value = readbuf_get(rb, len)) != NULL && rb->head[-1] == 0; } + +static int readbuf_string(struct readbuf *rb, const char **value, size_t *length) +{ + return _readbuf_string_(rb, value, length, 0); +} + +static int readbuf_nullstring(struct readbuf *rb, const char **value, size_t *length) +{ + return _readbuf_string_(rb, value, length, 1); +} + static int readbuf_object(struct readbuf *rb, struct json_object **object) { const char *string; struct json_object *o; + enum json_tokener_error jerr; int rc = readbuf_string(rb, &string, NULL); if (rc) { - o = json_tokener_parse(string); - if (o == NULL && strcmp(string, "null")) + o = json_tokener_parse_verbose(string, &jerr); + if (jerr != json_tokener_success) o = json_object_new_string(string); *object = o; } @@ -316,12 +318,43 @@ static int writebuf_string(struct writebuf *wb, const char *value) return writebuf_string_length(wb, value, strlen(value)); } +static int writebuf_nullstring(struct writebuf *wb, const char *value) +{ + return value ? writebuf_string_length(wb, value, strlen(value)) : writebuf_uint32(wb, 0); +} + static int writebuf_object(struct writebuf *wb, struct json_object *object) { const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); return string != NULL && writebuf_string(wb, string); } +/******************* queuing of messages *****************/ + +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) +{ + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -339,15 +372,16 @@ void afb_proto_ws_call_unref(struct afb_proto_ws_call *call) free(call); } -int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object *obj, const char *info) +int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info) { int rc = -1; struct writebuf wb = { .count = 0 }; struct afb_proto_ws *protows = call->protows; - if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS) + if (writebuf_char(&wb, CHAR_FOR_REPLY) && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, info ?: "") + && writebuf_nullstring(&wb, error) + && writebuf_nullstring(&wb, info) && writebuf_object(&wb, obj)) { pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); @@ -361,73 +395,6 @@ success: return rc; } -int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, const char *info) -{ - int rc = -1; - struct writebuf wb = { .count = 0 }; - struct afb_proto_ws *protows = call->protows; - - if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL) - && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, status) - && writebuf_string(&wb, info ? : "")) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: - return rc; -} - -int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) -{ - int rc = -1; - struct writebuf wb = { .count = 0 }; - struct server_subcall *sc, *osc; - struct afb_proto_ws *protows = call->protows; - - sc = malloc(sizeof *sc); - if (!sc) - errno = ENOMEM; - else { - sc->callback = callback; - sc->closure = cb_closure; - - pthread_mutex_lock(&protows->mutex); - sc->subcallid = ptr2id(sc); - do { - sc->subcallid++; - osc = protows->subcalls; - while(osc && osc->subcallid != sc->subcallid) - osc = osc->next; - } while (osc); - sc->next = protows->subcalls; - protows->subcalls = sc; - pthread_mutex_unlock(&protows->mutex); - - if (writebuf_char(&wb, CHAR_FOR_SUBCALL_CALL) - && writebuf_uint32(&wb, sc->subcallid) - && writebuf_uint32(&wb, call->callid) - && writebuf_string(&wb, api) - && writebuf_string(&wb, verb) - && writebuf_object(&wb, args)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } - } -success: - return rc; -} - int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) { int rc = -1; @@ -603,113 +570,23 @@ static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *r protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object); } -static void client_on_reply_success(struct afb_proto_ws *protows, struct readbuf *rb) +static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb) { struct client_call *call; struct json_object *object; - const char *info; - - if (!client_msg_call_get(protows, rb, &call)) - return; - - if (readbuf_string(rb, &info, NULL) && readbuf_object(rb, &object)) { - protows->client_itf->on_reply_success(protows->closure, call->request, object, info); - } else { - protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process success"); - } - client_call_destroy(call); -} - -static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *rb) -{ - struct client_call *call; - const char *info, *status; + const char *error, *info; if (!client_msg_call_get(protows, rb, &call)) return; - - if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) { - protows->client_itf->on_reply_fail(protows->closure, call->request, status, info); + if (readbuf_nullstring(rb, &error, NULL) && readbuf_nullstring(rb, &info, NULL) && readbuf_object(rb, &object)) { + protows->client_itf->on_reply(protows->closure, call->request, object, error, info); } else { - protows->client_itf->on_reply_fail(protows->closure, call->request, "proto-error", "can't process fail"); + protows->client_itf->on_reply(protows->closure, call->request, NULL, "proto-error", "can't process success"); } client_call_destroy(call); } -/* send a subcall reply */ -static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subcallid, int status, json_object *object) -{ - struct writebuf wb = { .count = 0 }; - char ie = status < 0; - int rc; - - if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) - && writebuf_uint32(&wb, subcallid) - && writebuf_char(&wb, ie) - && writebuf_object(&wb, object)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; -} - -/* callback for subcall reply */ -int afb_proto_ws_subcall_reply(struct afb_proto_ws_subcall *subcall, int status, struct json_object *result) -{ - int rc = client_send_subcall_reply(subcall->protows, subcall->subcallid, status, result); - afb_proto_ws_unref(subcall->protows); - free(subcall->buffer); - free(subcall); - return rc; -} - -/* received a subcall request */ -static void client_on_subcall(struct afb_proto_ws *protows, struct readbuf *rb) -{ - struct afb_proto_ws_subcall *subcall; - struct client_call *call; - const char *api, *verb; - uint32_t subcallid; - struct json_object *object; - - /* get the subcallid */ - if (!readbuf_uint32(rb, &subcallid)) - return; - - /* if not expected drop it */ - if (!protows->client_itf->on_subcall) - goto error; - - /* retrieve the message data */ - if (!client_msg_call_get(protows, rb, &call)) - goto error; - - /* allocation of the subcall */ - subcall = malloc(sizeof *subcall); - if (!subcall) - goto error; - - /* make the call */ - if (readbuf_string(rb, &api, NULL) - && readbuf_string(rb, &verb, NULL) - && readbuf_object(rb, &object)) { - afb_proto_ws_addref(protows); - subcall->protows = protows; - subcall->subcallid = subcallid; - subcall->buffer = rb->base; - rb->base = NULL; - protows->client_itf->on_subcall(protows->closure, subcall, call->request, api, verb, object); - return; - } - free(subcall); -error: - client_send_subcall_reply(protows, subcallid, 1, NULL); -} - static void client_on_description(struct afb_proto_ws *protows, struct readbuf *rb) { uint32_t descid; @@ -735,54 +612,51 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } /* callback when receiving binary data */ -static void client_on_binary(void *closure, char *data, size_t size) +static void client_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; + struct binary *binary = closure; - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; - - switch (*rb.head++) { - case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(protows, &rb); - break; - case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(protows, &rb); + if (!sig) { + switch (*binary->rb.head++) { + case CHAR_FOR_REPLY: /* reply */ + client_on_reply(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - client_on_event_broadcast(protows, &rb); + client_on_event_broadcast(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - client_on_event_create(protows, &rb); + client_on_event_create(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_DEL: /* removes the event */ - client_on_event_remove(protows, &rb); + client_on_event_remove(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - client_on_event_push(protows, &rb); + client_on_event_push(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - client_on_event_subscribe(protows, &rb); + client_on_event_subscribe(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - client_on_event_unsubscribe(protows, &rb); - break; - case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(protows, &rb); + client_on_event_unsubscribe(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIPTION: /* description */ - client_on_description(protows, &rb); + client_on_description(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +/* callback when receiving binary data */ +static void client_on_binary(void *closure, char *data, size_t size) +{ + struct afb_proto_ws *protows = closure; + + queue_message_processing(protows, data, size, client_on_binary_job); } int afb_proto_ws_client_call( @@ -790,7 +664,8 @@ int afb_proto_ws_client_call( const char *verb, struct json_object *args, const char *sessionid, - void *request + void *request, + const char *user_creds ) { int rc = -1; @@ -820,7 +695,8 @@ int afb_proto_ws_client_call( || !writebuf_uint32(&wb, call->callid) || !writebuf_string(&wb, verb) || !writebuf_string(&wb, sessionid) - || !writebuf_object(&wb, args)) { + || !writebuf_object(&wb, args) + || !writebuf_nullstring(&wb, user_creds)) { errno = EINVAL; goto clean; } @@ -900,7 +776,7 @@ error: static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) { struct afb_proto_ws_call *call; - const char *uuid, *verb; + const char *uuid, *verb, *user_creds; uint32_t callid; size_t lenverb; struct json_object *object; @@ -911,7 +787,8 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) if (!readbuf_uint32(rb, &callid) || !readbuf_string(rb, &verb, &lenverb) || !readbuf_string(rb, &uuid, NULL) - || !readbuf_object(rb, &object)) + || !readbuf_object(rb, &object) + || !readbuf_nullstring(rb, &user_creds, NULL)) goto overflow; /* create the request */ @@ -925,7 +802,7 @@ static void server_on_call(struct afb_proto_ws *protows, struct readbuf *rb) call->buffer = rb->base; rb->base = NULL; /* don't free the buffer */ - protows->server_itf->on_call(protows->closure, call, verb, object, uuid); + protows->server_itf->on_call(protows->closure, call, verb, object, uuid, user_creds); return; out_of_memory: @@ -935,39 +812,6 @@ overflow: afb_proto_ws_unref(protows); } -/* on subcall reply */ -static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf *rb) -{ - char ie; - uint32_t subcallid; - struct json_object *object; - struct server_subcall *sc, **psc; - - /* reads the call message data */ - if (!readbuf_uint32(rb, &subcallid) - || !readbuf_char(rb, &ie) - || !readbuf_object(rb, &object)) { - /* TODO bad protocol */ - return; - } - - /* search the subcall and unlink it */ - pthread_mutex_lock(&protows->mutex); - psc = &protows->subcalls; - while ((sc = *psc) && sc->subcallid != subcallid) - psc = &sc->next; - if (!sc) { - pthread_mutex_unlock(&protows->mutex); - json_object_put(object); - /* TODO subcall not found */ - } else { - *psc = sc->next; - pthread_mutex_unlock(&protows->mutex); - sc->callback(sc->closure, -(int)ie, object); - free(sc); - } -} - static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { int rc; @@ -1017,33 +861,32 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } /* callback when receiving binary data */ -static void server_on_binary(void *closure, char *data, size_t size) +static void server_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; + struct binary *binary = closure; - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; - - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_CALL: - server_on_call(protows, &rb); - break; - case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(protows, &rb); + server_on_call(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIBE: - server_on_describe(protows, &rb); + server_on_describe(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +static void server_on_binary(void *closure, char *data, size_t size) +{ + struct afb_proto_ws *protows = closure; + + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ @@ -1092,18 +935,19 @@ int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char static void on_hangup(void *closure) { struct afb_proto_ws *protows = closure; - struct server_subcall *sc, *nsc; struct client_describe *cd, *ncd; + struct client_call *call, *ncall; + + ncd = __atomic_exchange_n(&protows->describes, NULL, __ATOMIC_RELAXED); + ncall = __atomic_exchange_n(&protows->calls, NULL, __ATOMIC_RELAXED); - nsc = protows->subcalls; - while (nsc) { - sc= nsc; - nsc = sc->next; - sc->callback(sc->closure, 1, NULL); - free(sc); + while (ncall) { + call= ncall; + ncall = call->next; + protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up"); + free(call); } - ncd = protows->describes; while (ncd) { cd= ncd; ncd = cd->next; @@ -1111,9 +955,9 @@ static void on_hangup(void *closure) free(cd); } - if (protows->fd >= 0) { - close(protows->fd); - protows->fd = -1; + if (protows->fdev) { + fdev_unref(protows->fdev); + protows->fdev = 0; if (protows->on_hangup) protows->on_hangup(protows->closure); } @@ -1141,7 +985,7 @@ static const struct afb_ws_itf server_ws_itf = /*****************************************************/ -static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) +static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) { struct afb_proto_ws *protows; @@ -1149,13 +993,12 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, if (protows == NULL) errno = ENOMEM; else { - fcntl(fd, F_SETFD, FD_CLOEXEC); - fcntl(fd, F_SETFL, O_NONBLOCK); - protows->ws = afb_ws_create(eloop, fd, itf, protows); + fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC); + fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK); + protows->ws = afb_ws_create(fdev, itf, protows); if (protows->ws != NULL) { - protows->fd = fd; + protows->fdev = fdev; protows->refcount = 1; - protows->subcalls = NULL; protows->closure = closure; protows->server_itf = itfs; protows->client_itf = itfc; @@ -1167,19 +1010,19 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, return NULL; } -struct afb_proto_ws *afb_proto_ws_create_client(struct sd_event *eloop, int fd, const struct afb_proto_ws_client_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure) { - return afb_proto_ws_create(eloop, fd, NULL, itf, closure, &proto_ws_client_ws_itf); + return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf); } -struct afb_proto_ws *afb_proto_ws_create_server(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure) { - return afb_proto_ws_create(eloop, fd, itf, NULL, closure, &server_ws_itf); + return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf); } void afb_proto_ws_unref(struct afb_proto_ws *protows) { - if (!__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { + if (protows && !__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { afb_proto_ws_hangup(protows); afb_ws_destroy(protows->ws); pthread_mutex_destroy(&protows->mutex); @@ -1212,3 +1055,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +}