X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=ce7d75d3151916565954d994fb108be7fe7fb71f;hb=0891ef4826e347d5554c630b5c0ce73c68f76c9c;hp=90c3b05a05598742f6e8e659693161d67c57ec49;hpb=760f61ba4523d5c3461a0c30bbda78c84ab80103;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 90c3b05a..ce7d75d3 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -37,6 +37,7 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" +#include "jobs.h" struct afb_proto_ws; @@ -190,6 +191,27 @@ struct afb_proto_ws void (*on_hangup)(void *closure); }; +/******************* streaming objects **********************************/ + +#define WRITEBUF_COUNT_MAX 32 +struct writebuf +{ + struct iovec iovec[WRITEBUF_COUNT_MAX]; + uint32_t uints[WRITEBUF_COUNT_MAX]; + int count; +}; + +struct readbuf +{ + char *base, *head, *end; +}; + +struct binary +{ + struct afb_proto_ws *protows; + struct readbuf rb; +}; + /******************* common useful tools **********************************/ /** @@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr) /******************* serialisation part **********************************/ -struct readbuf -{ - char *base, *head, *end; -}; - -#define WRITEBUF_COUNT_MAX 32 -struct writebuf -{ - struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; -}; - static char *readbuf_get(struct readbuf *rb, uint32_t length) { char *before = rb->head; @@ -735,54 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } /* callback when receiving binary data */ -static void client_on_binary(void *closure, char *data, size_t size) +static void client_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(protows, &rb); + client_on_reply_success(binary->protows, &binary->rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(protows, &rb); + client_on_reply_fail(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - client_on_event_broadcast(protows, &rb); + client_on_event_broadcast(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - client_on_event_create(protows, &rb); + client_on_event_create(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_DEL: /* removes the event */ - client_on_event_remove(protows, &rb); + client_on_event_remove(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - client_on_event_push(protows, &rb); + client_on_event_push(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - client_on_event_subscribe(protows, &rb); + client_on_event_subscribe(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - client_on_event_unsubscribe(protows, &rb); + client_on_event_unsubscribe(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(protows, &rb); + client_on_subcall(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIPTION: /* description */ - client_on_description(protows, &rb); + client_on_description(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +/* callback when receiving binary data */ +static void client_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, client_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } int afb_proto_ws_client_call( @@ -1017,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } /* callback when receiving binary data */ -static void server_on_binary(void *closure, char *data, size_t size) +static void server_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_CALL: - server_on_call(protows, &rb); + server_on_call(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(protows, &rb); + server_on_subcall_reply(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIBE: - server_on_describe(protows, &rb); + server_on_describe(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +static void server_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, server_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } /******************* server part: manage events **********************************/