X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=142afa98b5ed4a58ffae298ad3171bd82e5e29db;hb=95ad0012182b5f32cc1dd9843304b58d263a7de0;hp=7644a96aa1a3f447d50cda90e28275ccb555939e;hpb=12ec841c28f8f795b49466cc377e64db3146430d;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 7644a96a..142afa98 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -158,6 +158,9 @@ struct afb_proto_ws /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(void (*process)(int s, void *c), void *closure); }; /******************* streaming objects **********************************/ @@ -325,6 +328,32 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object) return string != NULL && writebuf_string(wb, string); } +/******************* queuing of messages *****************/ + +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) +{ + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -624,25 +653,9 @@ static void client_on_binary_job(int sig, void *closure) /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, client_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, client_on_binary_job); } int afb_proto_ws_client_call( @@ -870,25 +883,9 @@ static void server_on_binary_job(int sig, void *closure) static void server_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, server_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ @@ -1047,3 +1044,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +}