X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=21ba2bca4de4442d61603dc5eb89bbed5a18daaf;hb=ff5446ec917b5f50333f2bee17ccfdf20eb99fac;hp=f9313545b03bf5f871fa763447fadb1f68a87586;hpb=4521c1e7ae5371ab9d639adc617d17fb4e8ded0c;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index f9313545..21ba2bca 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018 "IoT.bzh" + * Copyright (C) 2015-2019 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,8 +17,6 @@ #define _GNU_SOURCE -#define NO_PLUGIN_VERBOSE_MACRO - #include #include #include @@ -37,8 +35,8 @@ #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" -#include "jobs.h" #include "fdev.h" +#include "verbose.h" struct afb_proto_ws; @@ -91,7 +89,7 @@ For the purpose of handling events the server can: struct client_call { struct client_call *next; /* the next call */ struct afb_proto_ws *protows; /* the proto_ws */ - void *request; + void *request; /* the request closure */ uint32_t callid; /* the message identifier */ }; @@ -134,9 +132,6 @@ struct afb_proto_ws /* count of references */ int refcount; - /* file descriptor */ - struct fdev *fdev; - /* resource control */ pthread_mutex_t mutex; @@ -160,16 +155,21 @@ struct afb_proto_ws /* on hangup callback */ void (*on_hangup)(void *closure); + + /* queuing facility for processing messages */ + int (*queuing)(struct afb_proto_ws *proto, void (*process)(int s, void *c), void *closure); }; /******************* streaming objects **********************************/ -#define WRITEBUF_COUNT_MAX 32 +#define WRITEBUF_COUNT_MAX 32 +#define WRITEBUF_BUFSZ (WRITEBUF_COUNT_MAX * sizeof(uint32_t)) + struct writebuf { + int iovcount, bufcount; struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; + char buf[WRITEBUF_BUFSZ]; }; struct readbuf @@ -260,10 +260,11 @@ static int readbuf_object(struct readbuf *rb, struct json_object **object) { const char *string; struct json_object *o; + enum json_tokener_error jerr; int rc = readbuf_string(rb, &string, NULL); if (rc) { - o = json_tokener_parse(string); - if (o == NULL && strcmp(string, "null")) + o = json_tokener_parse_verbose(string, &jerr); + if (jerr != json_tokener_success) o = json_object_new_string(string); *object = o; } @@ -272,37 +273,54 @@ static int readbuf_object(struct readbuf *rb, struct json_object **object) static int writebuf_put(struct writebuf *wb, const void *value, size_t length) { - int i = wb->count; + int i = wb->iovcount; if (i == WRITEBUF_COUNT_MAX) return 0; wb->iovec[i].iov_base = (void*)value; wb->iovec[i].iov_len = length; - wb->count = i + 1; + wb->iovcount = i + 1; return 1; } -static int writebuf_char(struct writebuf *wb, char value) +static int writebuf_putbuf(struct writebuf *wb, const void *value, int length) { - int i = wb->count; - if (i == WRITEBUF_COUNT_MAX) + char *p; + int i = wb->iovcount, n = wb->bufcount, nafter; + + /* check enough length */ + nafter = n + length; + if (nafter > WRITEBUF_BUFSZ) + return 0; + + /* get where to store */ + p = &wb->buf[n]; + if (i && p == (((char*)wb->iovec[i - 1].iov_base) + wb->iovec[i - 1].iov_len)) + /* increase previous iovec */ + wb->iovec[i - 1].iov_len += (size_t)length; + else if (i == WRITEBUF_COUNT_MAX) + /* no more iovec */ return 0; - *(char*)&wb->uints[i] = value; - wb->iovec[i].iov_base = &wb->uints[i]; - wb->iovec[i].iov_len = 1; - wb->count = i + 1; + else { + /* new iovec */ + wb->iovec[i].iov_base = p; + wb->iovec[i].iov_len = (size_t)length; + wb->iovcount = i + 1; + } + /* store now */ + memcpy(p, value, (size_t)length); + wb->bufcount = nafter; return 1; } +static int writebuf_char(struct writebuf *wb, char value) +{ + return writebuf_putbuf(wb, &value, 1); +} + static int writebuf_uint32(struct writebuf *wb, uint32_t value) { - int i = wb->count; - if (i == WRITEBUF_COUNT_MAX) - return 0; - wb->uints[i] = htole32(value); - wb->iovec[i].iov_base = &wb->uints[i]; - wb->iovec[i].iov_len = sizeof wb->uints[i]; - wb->count = i + 1; - return 1; + value = htole32(value); + return writebuf_putbuf(wb, &value, (int)sizeof value); } static int writebuf_string_length(struct writebuf *wb, const char *value, size_t length) @@ -327,6 +345,53 @@ static int writebuf_object(struct writebuf *wb, struct json_object *object) return string != NULL && writebuf_string(wb, string); } +/******************* queuing of messages *****************/ + +/* queue the processing of the received message (except if size=0 cause it's not a valid message) */ +static void queue_message_processing(struct afb_proto_ws *protows, char *data, size_t size, void (*processing)(int,void*)) +{ + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + /* TODO process the problem */ + errno = ENOMEM; + } else { + binary->protows = protows; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + if (!protows->queuing + || protows->queuing(protows, processing, binary) < 0) + processing(0, binary); + return; + } + } + free(data); +} + +/******************* sending messages *****************/ + +static int proto_write(struct afb_proto_ws *protows, struct writebuf *wb) +{ + int rc; + struct afb_ws *ws; + + pthread_mutex_lock(&protows->mutex); + ws = protows->ws; + if (ws == NULL) { + errno = EPIPE; + rc = -1; + } else { + rc = afb_ws_binary_v(ws, wb->iovec, wb->iovcount); + if (rc > 0) + rc = 0; + } + pthread_mutex_unlock(&protows->mutex); + return rc; +} + /******************* ws request part for server *****************/ void afb_proto_ws_call_addref(struct afb_proto_ws_call *call) @@ -347,67 +412,43 @@ void afb_proto_ws_call_unref(struct afb_proto_ws_call *call) int afb_proto_ws_call_reply(struct afb_proto_ws_call *call, struct json_object *obj, const char *error, const char *info) { int rc = -1; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_REPLY) && writebuf_uint32(&wb, call->callid) && writebuf_nullstring(&wb, error) && writebuf_nullstring(&wb, info) - && writebuf_object(&wb, obj)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + && writebuf_object(&wb, obj)) + rc = proto_write(protows, &wb); return rc; } int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) { int rc = -1; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + && writebuf_string(&wb, event_name)) + rc = proto_write(protows, &wb); return rc; } int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *event_name, int event_id) { int rc = -1; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) - && writebuf_string(&wb, event_name)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; - goto success; - } - } -success: + && writebuf_string(&wb, event_name)) + rc = proto_write(protows, &wb); return rc; } @@ -487,6 +528,8 @@ static void client_on_event_create(struct afb_proto_ws *protows, struct readbuf if (protows->client_itf->on_event_create && client_msg_event_read(rb, &event_id, &event_name)) protows->client_itf->on_event_create(protows->closure, event_name, (int)event_id); + else + ERROR("Ignoring creation of event"); } /* removes an event */ @@ -497,6 +540,8 @@ static void client_on_event_remove(struct afb_proto_ws *protows, struct readbuf if (protows->client_itf->on_event_remove && client_msg_event_read(rb, &event_id, &event_name)) protows->client_itf->on_event_remove(protows->closure, event_name, (int)event_id); + else + ERROR("Ignoring deletion of event"); } /* subscribes an event */ @@ -508,6 +553,8 @@ static void client_on_event_subscribe(struct afb_proto_ws *protows, struct readb if (protows->client_itf->on_event_subscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) protows->client_itf->on_event_subscribe(protows->closure, call->request, event_name, (int)event_id); + else + ERROR("Ignoring subscription to event"); } /* unsubscribes an event */ @@ -519,16 +566,21 @@ static void client_on_event_unsubscribe(struct afb_proto_ws *protows, struct rea if (protows->client_itf->on_event_unsubscribe && client_msg_call_get(protows, rb, &call) && client_msg_event_read(rb, &event_id, &event_name)) protows->client_itf->on_event_unsubscribe(protows->closure, call->request, event_name, (int)event_id); + else + ERROR("Ignoring unsubscription to event"); } /* receives broadcasted events */ static void client_on_event_broadcast(struct afb_proto_ws *protows, struct readbuf *rb) { - const char *event_name; + const char *event_name, *uuid; + char hop; struct json_object *object; - if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object)) - protows->client_itf->on_event_broadcast(protows->closure, event_name, object); + if (protows->client_itf->on_event_broadcast && readbuf_string(rb, &event_name, NULL) && readbuf_object(rb, &object) && (uuid = readbuf_get(rb, 16)) && readbuf_char(rb, &hop)) + protows->client_itf->on_event_broadcast(protows->closure, event_name, object, (unsigned char*)uuid, (uint8_t)hop); + else + ERROR("Ignoring broadcast of event"); } /* pushs an event */ @@ -540,6 +592,8 @@ static void client_on_event_push(struct afb_proto_ws *protows, struct readbuf *r if (protows->client_itf->on_event_push && client_msg_event_read(rb, &event_id, &event_name) && readbuf_object(rb, &object)) protows->client_itf->on_event_push(protows->closure, event_name, (int)event_id, object); + else + ERROR("Ignoring push of event"); } static void client_on_reply(struct afb_proto_ws *protows, struct readbuf *rb) @@ -626,25 +680,9 @@ static void client_on_binary_job(int sig, void *closure) /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, client_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, client_on_binary_job); } int afb_proto_ws_client_call( @@ -658,7 +696,7 @@ int afb_proto_ws_client_call( { int rc = -1; struct client_call *call; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; /* allocate call data */ call = malloc(sizeof *call); @@ -690,13 +728,9 @@ int afb_proto_ws_client_call( } /* send */ - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) { - rc = 0; + rc = proto_write(protows, &wb); + if (!rc) goto end; - } clean: client_call_destroy(call); @@ -708,7 +742,7 @@ end: int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure) { struct client_describe *desc, *d; - struct writebuf wb = { .count = 0 }; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; desc = malloc(sizeof *desc); if (!desc) { @@ -737,7 +771,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( /* send */ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) && writebuf_uint32(&wb, desc->descid) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) { + && protows->ws != NULL + && afb_ws_binary_v(protows->ws, wb.iovec, wb.iovcount) >= 0) { pthread_mutex_unlock(&protows->mutex); return 0; } @@ -802,19 +837,14 @@ overflow: static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { - int rc; - struct writebuf wb = { .count = 0 }; + int rc = -1; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION) && writebuf_uint32(&wb, descid) - && writebuf_object(&wb, descobj)) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; + && writebuf_object(&wb, descobj)) + rc = proto_write(protows, &wb); + return rc; } int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description) @@ -872,45 +902,24 @@ static void server_on_binary_job(int sig, void *closure) static void server_on_binary(void *closure, char *data, size_t size) { - int rc; - struct binary *binary; + struct afb_proto_ws *protows = closure; - if (size) { - binary = malloc(sizeof *binary); - if (!binary) { - errno = ENOMEM; - } else { - binary->protows = closure; - binary->rb.base = data; - binary->rb.head = data; - binary->rb.end = data + size; - rc = jobs_queue(NULL, 0, server_on_binary_job, binary); - if (rc >= 0) - return; - free(binary); - } - } - free(data); + queue_message_processing(protows, data, size, server_on_binary_job); } /******************* server part: manage events **********************************/ static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) { - struct writebuf wb = { .count = 0 }; - int rc; + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; if (writebuf_char(&wb, order) - && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) + && writebuf_uint32(&wb, event_id) && writebuf_string(&wb, event_name) - && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) { - pthread_mutex_lock(&protows->mutex); - rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); - pthread_mutex_unlock(&protows->mutex); - if (rc >= 0) - return 0; - } - return -1; + && (order != CHAR_FOR_EVT_PUSH || writebuf_object(&wb, data))) + rc = proto_write(protows, &wb); + return rc; } int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) @@ -928,9 +937,21 @@ int afb_proto_ws_server_event_push(struct afb_proto_ws *protows, const char *eve return server_event_send(protows, CHAR_FOR_EVT_PUSH, event_name, event_id, data); } -int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data) +int afb_proto_ws_server_event_broadcast(struct afb_proto_ws *protows, const char *event_name, struct json_object *data, const unsigned char uuid[16], uint8_t hop) { - return server_event_send(protows, CHAR_FOR_EVT_BROADCAST, event_name, 0, data); + struct writebuf wb = { .iovcount = 0, .bufcount = 0 }; + int rc = -1; + + if (!hop) + return 0; + + if (writebuf_char(&wb, CHAR_FOR_EVT_BROADCAST) + && writebuf_string(&wb, event_name) + && writebuf_object(&wb, data) + && writebuf_put(&wb, uuid, 16) + && writebuf_char(&wb, (char)(hop - 1))) + rc = proto_write(protows, &wb); + return rc; } /*****************************************************/ @@ -940,8 +961,25 @@ static void on_hangup(void *closure) { struct afb_proto_ws *protows = closure; struct client_describe *cd, *ncd; + struct client_call *call, *ncall; + struct afb_ws *ws; + pthread_mutex_lock(&protows->mutex); ncd = protows->describes; + protows->describes = NULL; + ncall = protows->calls; + protows->calls = NULL; + ws = protows->ws; + protows->ws = NULL; + pthread_mutex_unlock(&protows->mutex); + + while (ncall) { + call= ncall; + ncall = call->next; + protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up"); + free(call); + } + while (ncd) { cd= ncd; ncd = cd->next; @@ -949,9 +987,8 @@ static void on_hangup(void *closure) free(cd); } - if (protows->fdev) { - fdev_unref(protows->fdev); - protows->fdev = 0; + if (ws) { + afb_ws_destroy(ws); if (protows->on_hangup) protows->on_hangup(protows->closure); } @@ -991,7 +1028,6 @@ static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK); protows->ws = afb_ws_create(fdev, itf, protows); if (protows->ws != NULL) { - protows->fdev = fdev; protows->refcount = 1; protows->closure = closure; protows->server_itf = itfs; @@ -1016,9 +1052,8 @@ struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct void afb_proto_ws_unref(struct afb_proto_ws *protows) { - if (!__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { + if (protows && !__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) { afb_proto_ws_hangup(protows); - afb_ws_destroy(protows->ws); pthread_mutex_destroy(&protows->mutex); free(protows); } @@ -1041,7 +1076,8 @@ int afb_proto_ws_is_server(struct afb_proto_ws *protows) void afb_proto_ws_hangup(struct afb_proto_ws *protows) { - afb_ws_hangup(protows->ws); + if (protows->ws) + afb_ws_hangup(protows->ws); } void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void *closure)) @@ -1049,3 +1085,7 @@ void afb_proto_ws_on_hangup(struct afb_proto_ws *protows, void (*on_hangup)(void protows->on_hangup = on_hangup; } +void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws*, void (*)(int,void*), void*)) +{ + protows->queuing = queuing; +}