X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-proto-ws.c;h=56669e59411059017482599c6cbda039882e3dad;hb=ac7a95223a6314cca6250495ea59c3cf7e46e89e;hp=1f17cd77adfd9b37e223fa3cc0fa83c84c5f3404;hpb=159c895986f2358d6df8bb5804cc4419cc6b457f;p=src%2Fapp-framework-binder.git diff --git a/src/afb-proto-ws.c b/src/afb-proto-ws.c index 1f17cd77..56669e59 100644 --- a/src/afb-proto-ws.c +++ b/src/afb-proto-ws.c @@ -33,14 +33,12 @@ #include #include -#include - -#include "afb-common.h" #include "afb-ws.h" #include "afb-msg-json.h" #include "afb-proto-ws.h" -#include "verbose.h" +#include "jobs.h" +#include "fdev.h" struct afb_proto_ws; @@ -55,7 +53,7 @@ The client can require the following actions: - ask for description -The server must reply to the previous actions by +The server must reply to the previous actions by - answering success or failure of the call @@ -164,7 +162,7 @@ struct afb_proto_ws int refcount; /* file descriptor */ - int fd; + struct fdev *fdev; /* resource control */ pthread_mutex_t mutex; @@ -194,6 +192,27 @@ struct afb_proto_ws void (*on_hangup)(void *closure); }; +/******************* streaming objects **********************************/ + +#define WRITEBUF_COUNT_MAX 32 +struct writebuf +{ + struct iovec iovec[WRITEBUF_COUNT_MAX]; + uint32_t uints[WRITEBUF_COUNT_MAX]; + int count; +}; + +struct readbuf +{ + char *base, *head, *end; +}; + +struct binary +{ + struct afb_proto_ws *protows; + struct readbuf rb; +}; + /******************* common useful tools **********************************/ /** @@ -208,19 +227,6 @@ static inline uint32_t ptr2id(void *ptr) /******************* serialisation part **********************************/ -struct readbuf -{ - char *base, *head, *end; -}; - -#define WRITEBUF_COUNT_MAX 32 -struct writebuf -{ - struct iovec iovec[WRITEBUF_COUNT_MAX]; - uint32_t uints[WRITEBUF_COUNT_MAX]; - int count; -}; - static char *readbuf_get(struct readbuf *rb, uint32_t length) { char *before = rb->head; @@ -347,18 +353,20 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, info ?: "") && writebuf_object(&wb, obj)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; } } - ERROR("error while sending success"); success: return rc; } @@ -367,18 +375,20 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL) && writebuf_uint32(&wb, call->callid) && writebuf_string(&wb, status) && writebuf_string(&wb, info ? : "")) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; } } - ERROR("error while sending fail"); success: return rc; } @@ -397,7 +407,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c sc->callback = callback; sc->closure = cb_closure; - pthread_mutex_unlock(&protows->mutex); + pthread_mutex_lock(&protows->mutex); sc->subcallid = ptr2id(sc); do { sc->subcallid++; @@ -415,14 +425,15 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c && writebuf_string(&wb, api) && writebuf_string(&wb, verb) && writebuf_object(&wb, args)) { + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; } } } - ERROR("error while sending subcall"); success: return rc; } @@ -431,18 +442,20 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; } } - ERROR("error while subscribing event"); success: return rc; } @@ -451,18 +464,20 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev { int rc = -1; struct writebuf wb = { .count = 0 }; + struct afb_proto_ws *protows = call->protows; if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && writebuf_uint32(&wb, call->callid) && writebuf_uint32(&wb, (uint32_t)event_id) && writebuf_string(&wb, event_name)) { - rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count); + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto success; } } - ERROR("error while subscribing event"); success: return rc; } @@ -470,7 +485,7 @@ success: /******************* client part **********************************/ /* search a memorized call */ -static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid) +static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid) { struct client_call *call; @@ -481,11 +496,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint return call; } +static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid) +{ + struct client_call *result; + + pthread_mutex_lock(&protows->mutex); + result = client_call_search_locked(protows, callid); + pthread_mutex_unlock(&protows->mutex); + return result; +} + /* free and release the memorizing call */ static void client_call_destroy(struct client_call *call) { struct client_call **prv; + struct afb_proto_ws *protows = call->protows; + pthread_mutex_lock(&protows->mutex); prv = &call->protows->calls; while (*prv != NULL) { if (*prv == call) { @@ -494,6 +521,7 @@ static void client_call_destroy(struct client_call *call) } prv = &(*prv)->next; } + pthread_mutex_unlock(&protows->mutex); free(call); } @@ -510,14 +538,12 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb, /* get event data from the message */ if (!readbuf_uint32(rb, &callid)) { - ERROR("Invalid message"); return 0; } /* get the call */ - *call = client_call_search(protows, callid); + *call = client_call_search_unlocked(protows, callid); if (*call == NULL) { - ERROR("message not found"); return 0; } @@ -611,6 +637,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r if (!client_msg_call_get(protows, rb, &call)) return; + if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) { protows->client_itf->on_reply_fail(protows->closure, call->request, status, info); @@ -625,12 +652,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc { struct writebuf wb = { .count = 0 }; char ie = status < 0; + int rc; - return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) + if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) && writebuf_uint32(&wb, subcallid) && writebuf_char(&wb, ie) - && writebuf_object(&wb, object) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + && writebuf_object(&wb, object)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } /* callback for subcall reply */ @@ -693,11 +727,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * struct json_object *object; if (readbuf_uint32(rb, &descid)) { + pthread_mutex_lock(&protows->mutex); prv = &protows->describes; while ((desc = *prv) && desc->descid != descid) prv = &desc->next; - if (desc) { + if (!desc) + pthread_mutex_unlock(&protows->mutex); + else { *prv = desc->next; + pthread_mutex_unlock(&protows->mutex); if (!readbuf_object(rb, &object)) object = NULL; desc->callback(desc->closure, object); @@ -707,56 +745,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf * } /* callback when receiving binary data */ -static void client_on_binary(void *closure, char *data, size_t size) +static void client_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - pthread_mutex_lock(&protows->mutex); - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - client_on_reply_success(protows, &rb); + client_on_reply_success(binary->protows, &binary->rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - client_on_reply_fail(protows, &rb); + client_on_reply_fail(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - client_on_event_broadcast(protows, &rb); + client_on_event_broadcast(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - client_on_event_create(protows, &rb); + client_on_event_create(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_DEL: /* removes the event */ - client_on_event_remove(protows, &rb); + client_on_event_remove(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - client_on_event_push(protows, &rb); + client_on_event_push(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - client_on_event_subscribe(protows, &rb); + client_on_event_subscribe(binary->protows, &binary->rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - client_on_event_unsubscribe(protows, &rb); + client_on_event_unsubscribe(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_CALL: /* subcall */ - client_on_subcall(protows, &rb); + client_on_subcall(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIPTION: /* description */ - client_on_description(protows, &rb); + client_on_description(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } - pthread_mutex_unlock(&protows->mutex); } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +/* callback when receiving binary data */ +static void client_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, client_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } int afb_proto_ws_client_call( @@ -782,11 +837,12 @@ int afb_proto_ws_client_call( /* init call data */ pthread_mutex_lock(&protows->mutex); call->callid = ptr2id(call); - while(client_call_search(protows, call->callid) != NULL) + while(client_call_search_locked(protows, call->callid) != NULL) call->callid++; call->protows = protows; call->next = protows->calls; protows->calls = call; + pthread_mutex_unlock(&protows->mutex); /* creates the call message */ if (!writebuf_char(&wb, CHAR_FOR_CALL) @@ -799,7 +855,9 @@ int afb_proto_ws_client_call( } /* send */ + pthread_mutex_lock(&protows->mutex); rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); if (rc >= 0) { rc = 0; goto end; @@ -808,7 +866,6 @@ int afb_proto_ws_client_call( clean: client_call_destroy(call); end: - pthread_mutex_unlock(&protows->mutex); return rc; } @@ -841,15 +898,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( desc->protows = protows; desc->next = protows->describes; protows->describes = desc; - pthread_mutex_unlock(&protows->mutex); /* send */ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) && writebuf_uint32(&wb, desc->descid) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) + && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) { + pthread_mutex_unlock(&protows->mutex); return 0; + } - pthread_mutex_lock(&protows->mutex); d = protows->describes; if (d == desc) protows->describes = desc->next; @@ -859,8 +916,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)( if (d) d->next = desc->next; } - free(desc); pthread_mutex_unlock(&protows->mutex); + free(desc); error: /* TODO? callback(closure, NULL); */ return -1; @@ -942,18 +999,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj) { + int rc; struct writebuf wb = { .count = 0 }; - return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION) - && writebuf_uint32(&wb, descid) - && writebuf_object(&wb, descobj) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION) + && writebuf_uint32(&wb, descid) + && writebuf_object(&wb, descobj)) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description) { int rc = server_send_description(describe->protows, describe->descid, description); - afb_proto_ws_addref(describe->protows); + afb_proto_ws_unref(describe->protows); free(describe); return rc; } @@ -982,33 +1046,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb) } /* callback when receiving binary data */ -static void server_on_binary(void *closure, char *data, size_t size) +static void server_on_binary_job(int sig, void *closure) { - struct afb_proto_ws *protows; - struct readbuf rb; - - rb.base = data; - if (size > 0) { - rb.head = data; - rb.end = data + size; - protows = closure; + struct binary *binary = closure; - switch (*rb.head++) { + if (!sig) { + switch (*binary->rb.head++) { case CHAR_FOR_CALL: - server_on_call(protows, &rb); + server_on_call(binary->protows, &binary->rb); break; case CHAR_FOR_SUBCALL_REPLY: - server_on_subcall_reply(protows, &rb); + server_on_subcall_reply(binary->protows, &binary->rb); break; case CHAR_FOR_DESCRIBE: - server_on_describe(protows, &rb); + server_on_describe(binary->protows, &binary->rb); break; default: /* unexpected message */ /* TODO: close the connection */ break; } } - free(rb.base); + free(binary->rb.base); + free(binary); +} + +static void server_on_binary(void *closure, char *data, size_t size) +{ + int rc; + struct binary *binary; + + if (size) { + binary = malloc(sizeof *binary); + if (!binary) { + errno = ENOMEM; + } else { + binary->protows = closure; + binary->rb.base = data; + binary->rb.head = data; + binary->rb.end = data + size; + rc = jobs_queue(NULL, 0, server_on_binary_job, binary); + if (rc >= 0) + return; + free(binary); + } + } + free(data); } /******************* server part: manage events **********************************/ @@ -1016,12 +1098,19 @@ static void server_on_binary(void *closure, char *data, size_t size) static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data) { struct writebuf wb = { .count = 0 }; + int rc; - return -!(writebuf_char(&wb, order) - && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) - && writebuf_string(&wb, event_name) - && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data)) - && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0); + if (writebuf_char(&wb, order) + && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id)) + && writebuf_string(&wb, event_name) + && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) { + pthread_mutex_lock(&protows->mutex); + rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count); + pthread_mutex_unlock(&protows->mutex); + if (rc >= 0) + return 0; + } + return -1; } int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id) @@ -1069,8 +1158,9 @@ static void on_hangup(void *closure) free(cd); } - if (protows->fd >= 0) { - protows->fd = -1; + if (protows->fdev) { + fdev_unref(protows->fdev); + protows->fdev = 0; if (protows->on_hangup) protows->on_hangup(protows->closure); } @@ -1098,7 +1188,7 @@ static const struct afb_ws_itf server_ws_itf = /*****************************************************/ -static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) +static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf) { struct afb_proto_ws *protows; @@ -1106,11 +1196,11 @@ static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_w if (protows == NULL) errno = ENOMEM; else { - fcntl(fd, F_SETFD, FD_CLOEXEC); - fcntl(fd, F_SETFL, O_NONBLOCK); - protows->ws = afb_ws_create(afb_common_get_event_loop(), fd, itf, protows); + fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC); + fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK); + protows->ws = afb_ws_create(fdev, itf, protows); if (protows->ws != NULL) { - protows->fd = fd; + protows->fdev = fdev; protows->refcount = 1; protows->subcalls = NULL; protows->closure = closure; @@ -1124,14 +1214,14 @@ static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_w return NULL; } -struct afb_proto_ws *afb_proto_ws_create_client(int fd, const struct afb_proto_ws_client_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure) { - return afb_proto_ws_create(fd, NULL, itf, closure, &proto_ws_client_ws_itf); + return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf); } -struct afb_proto_ws *afb_proto_ws_create_server(int fd, const struct afb_proto_ws_server_itf *itf, void *closure) +struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure) { - return afb_proto_ws_create(fd, itf, NULL, closure, &server_ws_itf); + return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf); } void afb_proto_ws_unref(struct afb_proto_ws *protows)