X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-stub-ws.c;h=ce68b441cc736fc40b980f891e6a544a827ae4b3;hb=197626868aaf84e9a68e8e7e5397ef1c6883a0f1;hp=8298fc7bdd9ca022b7cccb4f60107774137d37ac;hpb=968c3fca18c7518f240d2487a561e8df8b2907a7;p=src%2Fapp-framework-binder.git diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 8298fc7b..ce68b441 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -35,7 +35,7 @@ #include #include -#include +#include #include "afb-common.h" @@ -50,9 +50,8 @@ #include "afb-evt.h" #include "afb-xreq.h" #include "verbose.h" +#include "jobs.h" -struct client_call; -struct client_event; struct afb_stub_ws; /************** constants for protocol definition *************************/ @@ -68,6 +67,8 @@ struct afb_stub_ws; #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' #define CHAR_FOR_SUBCALL_CALL 'B' #define CHAR_FOR_SUBCALL_REPLY 'R' +#define CHAR_FOR_DESCRIBE 'D' +#define CHAR_FOR_DESCRIPTION 'd' /******************* handling subcalls *****************************/ @@ -122,7 +123,28 @@ struct client_event int refcount; }; -/******************* client description part for server *****************************/ +/* + * structure for recording describe requests + */ +struct client_describe +{ + struct client_describe *next; + struct afb_stub_ws *stubws; + struct jobloop *jobloop; + struct json_object *result; + uint32_t descid; +}; + +/* + * structure for jobs of describing + */ +struct server_describe +{ + struct afb_stub_ws *stubws; + uint32_t descid; +}; + +/******************* stub description for client or servers ******************/ struct afb_stub_ws { @@ -153,9 +175,15 @@ struct afb_stub_ws /* pending subcalls (server side) */ struct server_subcall *subcalls; + /* pending description (client side) */ + struct client_describe *describes; + /* apiset */ struct afb_apiset *apiset; + /* on hangup callback */ + void (*on_hangup)(struct afb_stub_ws *); + /* the api name */ char apiname[1]; }; @@ -354,7 +382,7 @@ static void server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const sc = malloc(sizeof *sc); if (!sc) { - + callback(cb_closure, 1, afb_msg_json_internal_error()); } else { sc->callback = callback; sc->closure = cb_closure; @@ -715,11 +743,11 @@ static void client_reply_fail(struct afb_stub_ws *stubws, struct readbuf *rb) } /* send a subcall reply */ -static void client_send_subcall_reply(struct client_subcall *subcall, int iserror, json_object *object) +static void client_send_subcall_reply(struct client_subcall *subcall, int status, json_object *object) { int rc; struct writebuf wb = { .count = 0 }; - char ie = (char)!!iserror; + char ie = status < 0; if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY) || !writebuf_uint32(&wb, subcall->subcallid) @@ -736,9 +764,9 @@ static void client_send_subcall_reply(struct client_subcall *subcall, int iserro } /* callback for subcall reply */ -static void client_subcall_reply_cb(void *closure, int iserror, json_object *object) +static void client_subcall_reply_cb(void *closure, int status, json_object *object) { - client_send_subcall_reply(closure, iserror, object); + client_send_subcall_reply(closure, status, object); free(closure); } @@ -769,6 +797,31 @@ static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb) } } +/* pushs an event */ +static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb) +{ + uint32_t descid; + struct client_describe *desc; + struct json_object *object; + + if (!readbuf_uint32(rb, &descid)) + ERROR("unreadable description"); + else { + desc = stubws->describes; + while (desc && desc->descid != descid) + desc = desc->next; + if (desc == NULL) + ERROR("unexpected description"); + else { + if (readbuf_object(rb, &object)) + desc->result = object; + else + ERROR("bad description"); + jobs_leave(desc->jobloop); + } + } +} + /* callback when receiving binary data */ static void client_on_binary(void *closure, char *data, size_t size) { @@ -805,6 +858,9 @@ static void client_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_SUBCALL_CALL: /* subcall */ client_subcall(stubws, &rb); break; + case CHAR_FOR_DESCRIPTION: /* description */ + client_on_description(stubws, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -866,6 +922,70 @@ end: pthread_mutex_unlock(&stubws->mutex); } +static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop) +{ + struct client_describe *desc = closure; + struct writebuf wb = { .count = 0 }; + + if (!signum) { + /* record the jobloop */ + desc->jobloop = jobloop; + + /* send */ + if (writebuf_char(&wb, CHAR_FOR_DESCRIBE) + && writebuf_uint32(&wb, desc->descid) + && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0) + return; + } + jobs_leave(jobloop); +} + +/* get the description */ +static struct json_object *client_describe_cb(void * closure) +{ + struct client_describe desc, *d; + struct afb_stub_ws *stubws = closure; + + /* fill in stack the description of the task */ + pthread_mutex_lock(&stubws->mutex); + desc.result = NULL; + desc.descid = ptr2id(&desc); + d = stubws->describes; + while (d) { + if (d->descid != desc.descid) + d = d->next; + else { + desc.descid++; + d = stubws->describes; + } + } + desc.stubws = stubws; + desc.next = stubws->describes; + stubws->describes = &desc; + pthread_mutex_unlock(&stubws->mutex); + + /* synchronous job: send the request and wait its result */ + jobs_enter(NULL, 0, client_send_describe_cb, &desc); + + /* unlink and send the result */ + pthread_mutex_lock(&stubws->mutex); + d = stubws->describes; + if (d == &desc) + stubws->describes = desc.next; + else { + while (d) { + if (d->next != &desc) + d = d->next; + else { + d->next = desc.next; + d = NULL; + } + } + } + pthread_mutex_unlock(&stubws->mutex); + return desc.result; +} + /******************* client description part for server *****************************/ /* on call, propagate it to the ws service */ @@ -923,14 +1043,14 @@ overflow: /* on subcall reply */ static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb) { - char iserror; + char ie; uint32_t subcallid; struct json_object *object; struct server_subcall *sc, **psc; /* reads the call message data */ if (!readbuf_uint32(rb, &subcallid) - || !readbuf_char(rb, &iserror) + || !readbuf_char(rb, &ie) || !readbuf_object(rb, &object)) { /* TODO bad protocol */ return; @@ -947,12 +1067,62 @@ static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf * } else { *psc = sc->next; pthread_mutex_unlock(&stubws->mutex); - sc->callback(sc->closure, (int)iserror, object); + sc->callback(sc->closure, -(int)ie, object); free(sc); } json_object_put(object); } +static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj) +{ + struct writebuf wb = { .count = 0 }; + + if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION) + || !writebuf_uint32(&wb, descid) + || !writebuf_object(&wb, descobj) + || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0) + ERROR("can't send description"); +} + +static void server_describe_job(int signum, void *closure) +{ + struct json_object *obj; + struct server_describe *desc = closure; + + /* get the description if possible */ + obj = !signum ? afb_apiset_describe(desc->stubws->apiset, desc->stubws->apiname) : NULL; + + /* send it */ + server_send_description(desc->stubws, desc->descid, obj); + json_object_put(obj); + afb_stub_ws_unref(desc->stubws); + free(desc); +} + +/* on describe, propagate it to the ws service */ +static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb) +{ + + uint32_t descid; + struct server_describe *desc; + + /* reads the descid */ + if (readbuf_uint32(rb, &descid)) { + /* create asynchronous job */ + desc = malloc(sizeof *desc); + if (desc) { + desc->descid = descid; + desc->stubws = stubws; + afb_stub_ws_addref(stubws); + if (jobs_queue(NULL, 0, server_describe_job, desc) < 0) + server_describe_job(0, desc); + return; + } + server_send_description(stubws, descid, NULL); + } + ERROR("can't provide description"); +} + /* callback when receiving binary data */ static void server_on_binary(void *closure, char *data, size_t size) { @@ -965,6 +1135,9 @@ static void server_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_SUBCALL_REPLY: server_on_subcall_reply(closure, &rb); break; + case CHAR_FOR_DESCRIBE: + server_on_describe(closure, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1035,6 +1208,8 @@ static void server_on_hangup(void *closure) if (stubws->fd >= 0) { close(stubws->fd); stubws->fd = -1; + if (stubws->on_hangup) + stubws->on_hangup(stubws); } /* release the client */ @@ -1070,7 +1245,8 @@ static const struct afb_ws_itf server_ws_itf = }; static struct afb_api_itf ws_api_itf = { - .call = client_call_cb + .call = client_call_cb, + .describe = client_describe_cb }; /*****************************************************/ @@ -1101,19 +1277,7 @@ static struct afb_stub_ws *afb_stub_ws_create(int fd, const char *apiname, struc struct afb_stub_ws *afb_stub_ws_create_client(int fd, const char *apiname, struct afb_apiset *apiset) { - struct afb_api afb_api; - struct afb_stub_ws *stubws; - - stubws = afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf); - if (stubws) { - afb_api.closure = stubws; - afb_api.itf = &ws_api_itf; - if (afb_apiset_add(apiset, stubws->apiname, afb_api) >= 0) - return stubws; - afb_stub_ws_unref(stubws); - } - return NULL; - + return afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf); } struct afb_stub_ws *afb_stub_ws_create_server(int fd, const char *apiname, struct afb_apiset *apiset) @@ -1156,3 +1320,28 @@ void afb_stub_ws_addref(struct afb_stub_ws *stubws) __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED); } +void afb_stub_ws_on_hangup(struct afb_stub_ws *stubws, void (*on_hangup)(struct afb_stub_ws*)) +{ + stubws->on_hangup = on_hangup; +} + +const char *afb_stub_ws_name(struct afb_stub_ws *stubws) +{ + return stubws->apiname; +} + +struct afb_api afb_stub_ws_client_api(struct afb_stub_ws *stubws) +{ + struct afb_api api; + + assert(!stubws->listener); /* check client */ + api.closure = stubws; + api.itf = &ws_api_itf; + return api; +} + +int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset) +{ + return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws)); +} +