X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=89d4522c1ad4781671460c30a2cf3c9da16d2df4;hb=e36f8d572ca660f5c06fe45297d13c7a6818cc65;hp=f9313545b03bf5f871fa763447fadb1f68a87586;hpb=4521c1e7ae5371ab9d639adc617d17fb4e8ded0c;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index f9313545..89d4522c 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -17,8 +17,6 @@ #define _GNU_SOURCE -#define NO_PLUGIN_VERBOSE_MACRO - #include #include #include @@ -91,7 +89,7 @@ For the purpose of handling events the server can: struct client_call { struct client_call *next; /* the next call */ struct afb_proto_ws *protows; /* the proto_ws */ - void *request; + void *request; /* the request closure */ uint32_t callid; /* the message identifier */ }; @@ -160,6 +158,9 @@ struct afb_proto_ws /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(void (*process)(int s, void *c), void *closure); }; /******************* streaming objects **********************************/ @@ -260,10 +261,11 @@ static int readbuf_object(struct readbuf *rb, struct json_object **object) { const char *string; struct json_object *o; + enum json_tokener_error jerr; int rc = readbuf_string(rb, &string, NULL); if (rc) { - o = json_tokener_parse(string); - if (o == NULL && strcmp(string, "null")) + o = json_tokener_parse_verbose(string, &jerr); + if (jerr != json_tokener_success) o = json_object_new_string(string); *object = o; } @@ -327,6 +329,32 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object) return string != NULL && writebuf_string(wb, string); } +/******************* queuing of messages *****************/ + +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) +{ + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -626,25 +654,9 @@ static void client_on_binary_job(int sig, void *closure) /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, client_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, client_on_binary_job); } int afb_proto_ws_client_call( @@ -872,25 +884,9 @@ static void server_on_binary_job(int sig, void *closure) static void server_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, server_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ @@ -940,8 +936,18 @@ static void on_hangup(void *closure) { struct afb_proto_ws *protows = closure; struct client_describe *cd, *ncd; + struct client_call *call, *ncall; + + ncd = __atomic_exchange_n(&protows->describes, NULL, __ATOMIC_RELAXED); + ncall = __atomic_exchange_n(&protows->calls, NULL, __ATOMIC_RELAXED); + + while (ncall) { + call= ncall; + ncall = call->next; + protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up"); + free(call); + } - ncd = protows->describes; while (ncd) { cd= ncd; ncd = cd->next; @@ -1049,3 +1055,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +}