X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-stub-ws.c;h=3e9ede2f47c370d6a379e04d49062b630992e0eb;hb=29ae81fa15c6080fd27929f4cc78e1289cb920e9;hp=19b9b61885618e0c479c04a5b04a1ab16055b38d;hpb=7b6940f1524cac6172e71529a989424ff18fb850;p=src%2Fapp-framework-binder.git diff --git a/src/afb-stub-ws.c b/src/afb-stub-ws.c index 19b9b618..3e9ede2f 100644 --- a/src/afb-stub-ws.c +++ b/src/afb-stub-ws.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015-2018 "IoT.bzh" + * Copyright (C) 2015-2019 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -128,6 +128,13 @@ struct afb_stub_ws struct { /* event replica */ struct client_event *events; + + /* robustify */ + struct { + struct fdev *(*reopen)(void*); + void *closure; + void (*release)(void*); + } robust; }; }; @@ -138,9 +145,11 @@ struct afb_stub_ws uint8_t is_client; /* the api name */ - char apiname[1]; + char apiname[]; }; +static struct afb_proto_ws *afb_stub_ws_create_proto(struct afb_stub_ws *stubws, struct fdev *fdev, uint8_t server); + /******************* ws request part for server *****************/ /* decrement the reference count of the request and free/release it on falling to null */ @@ -226,31 +235,49 @@ static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint while (ev != NULL && (ev->id != eventid || 0 != strcmp(afb_evt_event_x2_fullname(ev->event), name))) ev = ev->next; + DEBUG("searching event %s[%d]: %s", name, eventid, ev ? "found" : "not found"); return ev; } +static struct afb_proto_ws *client_get_proto(struct afb_stub_ws *stubws) +{ + struct fdev *fdev; + struct afb_proto_ws *proto; + + proto = stubws->proto; + if (proto == NULL && stubws->robust.reopen) { + fdev = stubws->robust.reopen(stubws->robust.closure); + if (fdev != NULL) + proto = afb_stub_ws_create_proto(stubws, fdev, 0); + } + return proto; +} + /* on call, propagate it to the ws service */ static void client_api_call_cb(void * closure, struct afb_xreq *xreq) { int rc; struct afb_stub_ws *stubws = closure; + struct afb_proto_ws *proto; - if (stubws->proto == NULL) { + proto = client_get_proto(stubws); + if (proto == NULL) { afb_xreq_reply(xreq, NULL, "disconnected", "server hung up"); return; } + afb_xreq_unhooked_addref(xreq); rc = afb_proto_ws_client_call( - stubws->proto, + proto, xreq->request.called_verb, afb_xreq_json(xreq), afb_session_uuid(xreq->context.session), xreq, xreq_on_behalf_cred_export(xreq)); - if (rc >= 0) - afb_xreq_unhooked_addref(xreq); - else + if (rc < 0) { afb_xreq_reply(xreq, NULL, "internal", "can't send message"); + afb_xreq_unhooked_unref(xreq); + } } static void client_on_description_cb(void *closure, struct json_object *data) @@ -264,12 +291,14 @@ static void client_on_description_cb(void *closure, struct json_object *data) static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop) { struct client_describe *desc = closure; + struct afb_proto_ws *proto; - if (signum || desc->stubws->proto == NULL) + proto = client_get_proto(desc->stubws); + if (signum || proto == NULL) jobs_leave(jobloop); else { desc->jobloop = jobloop; - afb_proto_ws_client_describe(desc->stubws->proto, client_on_description_cb, desc); + afb_proto_ws_client_describe(proto, client_on_description_cb, desc); } } @@ -312,12 +341,12 @@ static void server_event_push_cb(void *closure, const char *event, int eventid, json_object_put(object); } -static void server_event_broadcast_cb(void *closure, const char *event, int eventid, struct json_object *object) +static void server_event_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { struct afb_stub_ws *stubws = closure; if (stubws->proto != NULL) - afb_proto_ws_server_event_broadcast(stubws->proto, event, object); + afb_proto_ws_server_event_broadcast(stubws->proto, event, object, uuid, hop); json_object_put(object); } @@ -428,9 +457,9 @@ static void client_on_event_push_cb(void *closure, const char *event_name, int e ERROR("unreadable push event"); } -static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data) +static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data, const uuid_binary_t uuid, uint8_t hop) { - afb_evt_broadcast(event_name, data); + afb_evt_rebroadcast(event_name, data, uuid, hop); } /*****************************************************/ @@ -494,9 +523,8 @@ static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, con wreq->call = call; /* init the context */ - if (afb_context_connect(&wreq->xreq.context, sessionid, NULL) < 0) + if (afb_context_connect_validated(&wreq->xreq.context, sessionid) < 0) goto unconnected; - wreq->xreq.context.validated = 1; server_record_session(stubws, wreq->xreq.context.session); if (wreq->xreq.context.created) afb_session_set_autoclose(wreq->xreq.context.session, 1); @@ -618,9 +646,9 @@ static void on_hangup(void *closure) } } -static int enqueue_processing(void (*callback)(int signum, void* arg), void *arg) +static int enqueue_processing(struct afb_proto_ws *proto, void (*callback)(int signum, void* arg), void *arg) { - return jobs_queue(NULL, 0, callback, arg); + return jobs_queue(proto, 0, callback, arg); } /*****************************************************/ @@ -644,7 +672,7 @@ static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *api { struct afb_stub_ws *stubws; - stubws = calloc(1, sizeof *stubws + strlen(apiname)); + stubws = calloc(1, sizeof *stubws + 1 + strlen(apiname)); if (stubws == NULL) errno = ENOMEM; else { @@ -685,6 +713,12 @@ void afb_stub_ws_unref(struct afb_stub_ws *stubws) { if (stubws && !__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) { + if (stubws->is_client) { + stubws->robust.reopen = NULL; + if (stubws->robust.release) + stubws->robust.release(stubws->robust.closure); + } + disconnect(stubws); afb_apiset_unref(stubws->apiset); free(stubws); @@ -713,7 +747,7 @@ struct afb_api_item afb_stub_ws_client_api(struct afb_stub_ws *stubws) assert(stubws->is_client); /* check client */ api.closure = stubws; api.itf = &client_api_itf; - api.group = NULL; + api.group = stubws; /* serialize for reconnections */ return api; } @@ -721,3 +755,15 @@ int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset { return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws)); } + +void afb_stub_ws_client_robustify(struct afb_stub_ws *stubws, struct fdev *(*reopen)(void*), void *closure, void (*release)(void*)) +{ + assert(stubws->is_client); /* check client */ + + if (stubws->robust.release) + stubws->robust.release(stubws->robust.closure); + + stubws->robust.reopen = reopen; + stubws->robust.closure = closure; + stubws->robust.release = release; +}