From f262b0f726ac0577f40525038b779185f144873f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Wed, 27 Apr 2016 21:25:54 +0200 Subject: [PATCH] first add of asynchonous handling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: Id9159d33937dc23342d32892f77998fb8cef0000 Signed-off-by: José Bollo --- include/afb-req-itf.h | 1 + plugins/afm-main-plugin/afm-main-plugin.c | 83 +++++++++++++++++++-------- src/afb-hreq.c | 12 ++++ src/afb-hreq.h | 6 ++ src/afb-hsrv.c | 93 ++++++++++++++++++++----------- src/afb-hsrv.h | 1 - 6 files changed, 140 insertions(+), 56 deletions(-) diff --git a/include/afb-req-itf.h b/include/afb-req-itf.h index d4ade2be..df133f5c 100644 --- a/include/afb-req-itf.h +++ b/include/afb-req-itf.h @@ -44,6 +44,7 @@ struct afb_req { void *req_closure; void *ctx_closure; }; + static inline struct afb_arg afb_req_get(struct afb_req req, const char *name) { return req.itf->get(req.req_closure, name); diff --git a/plugins/afm-main-plugin/afm-main-plugin.c b/plugins/afm-main-plugin/afm-main-plugin.c index 553cffc3..21e27e82 100644 --- a/plugins/afm-main-plugin/afm-main-plugin.c +++ b/plugins/afm-main-plugin/afm-main-plugin.c @@ -51,6 +51,22 @@ static struct afb_evmgr evmgr; static struct jbus *jbus; +struct memo +{ + struct afb_req request; + const char *method; +}; + +static struct memo *make_memo(struct afb_req request, const char *method) +{ + struct memo *memo = malloc(sizeof *memo); + if (memo != NULL) { + memo->request = request; + memo->method = method; + } + return memo; +} + static void application_list_changed(const char *data, void *closure) { afb_evmgr_push(evmgr, "application-list-changed", NULL); @@ -78,49 +94,70 @@ static struct json_object *embed(const char *tag, struct json_object *obj) return result; } -static void embed_call_void(struct afb_req request, const char *method) +static void embed_call_void_callback(int status, struct json_object *obj, struct memo *memo) { - struct json_object *obj = jbus_call_sj_sync(jbus, method, "true"); if (interface->verbosity) - fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", method, + fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", memo->method, obj ? json_object_to_json_string(obj) : "NULL"); if (obj == NULL) { - afb_req_fail(request, "failed", "framework daemon failure"); - return; + afb_req_fail(memo->request, "failed", "framework daemon failure"); + } else { + obj = json_object_get(obj); + obj = embed(memo->method, obj); + if (obj == NULL) { + afb_req_fail(memo->request, "failed", "framework daemon failure"); + } else { + afb_req_success(memo->request, obj, NULL); + } } - obj = json_object_get(obj); - obj = embed(method, obj); + free(memo); +} + +static void embed_call_void(struct afb_req request, const char *method) +{ + struct memo *memo = make_memo(request, method); + if (memo == NULL) + afb_req_fail(request, "failed", "out of memory"); + else if (jbus_call_sj(jbus, method, "true", (void*)embed_call_void_callback, memo) < 0) { + afb_req_fail(request, "failed", "dbus failure"); + free(memo); + } +} + +static void call_appid_callback(int status, struct json_object *obj, struct memo *memo) +{ + if (interface->verbosity) + fprintf(stderr, "(afm-main-plugin) %s -> %s\n", memo->method, + obj ? json_object_to_json_string(obj) : "NULL"); if (obj == NULL) { - afb_req_fail(request, "failed", "framework daemon failure"); - return; + afb_req_fail(memo->request, "failed", "framework daemon failure"); + } else { + obj = json_object_get(obj); + afb_req_success(memo->request, obj, NULL); } - afb_req_success(request, obj, NULL); + free(memo); } static void call_appid(struct afb_req request, const char *method) { - struct json_object *obj; + struct memo *memo; char *sid; const char *id = afb_req_value(request, _id_); if (id == NULL) { afb_req_fail(request, "bad-request", "missing 'id'"); return; } - if (asprintf(&sid, "\"%s\"", id) <= 0) { + memo = make_memo(request, method); + if (asprintf(&sid, "\"%s\"", id) <= 0 || memo == NULL) { afb_req_fail(request, "server-error", "out of memory"); + free(memo); return; } - obj = jbus_call_sj_sync(jbus, method, sid); - if (interface->verbosity) - fprintf(stderr, "(afm-main-plugin) %s(%s) -> %s\n", method, sid, - obj ? json_object_to_json_string(obj) : "NULL"); - free(sid); - if (obj == NULL) { - afb_req_fail(request, "failed", "framework daemon failure"); - return; + if (jbus_call_sj(jbus, method, sid, (void*)call_appid_callback, memo) < 0) { + afb_req_fail(request, "failed", "dbus failure"); + free(memo); } - obj = json_object_get(obj); - afb_req_success(request, obj, NULL); + free(sid); } static void call_runid(struct afb_req request, const char *method) @@ -143,7 +180,6 @@ static void call_runid(struct afb_req request, const char *method) afb_req_success(request, obj, NULL); } - /************************** entries ******************************/ static void runnables(struct afb_req request) @@ -250,7 +286,6 @@ static void install(struct afb_req request) obj = jbus_call_sj_sync(jbus, _install_, query); if (interface->verbosity) -; fprintf(stderr, "(afm-main-plugin) install(%s) -> %s\n", query, obj ? json_object_to_json_string(obj) : "NULL"); free(query); diff --git a/src/afb-hreq.c b/src/afb-hreq.c index c90b2372..6b7d64be 100644 --- a/src/afb-hreq.c +++ b/src/afb-hreq.c @@ -152,6 +152,10 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_ { char *cookie; const char *k, *v; + + if (hreq->replied != 0) + return; + k = va_arg(args, const char *); while (k != NULL) { v = va_arg(args, const char *); @@ -164,6 +168,14 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_ } MHD_queue_response(hreq->connection, status, response); MHD_destroy_response(response); + + hreq->replied = 1; + if (hreq->suspended != 0) { + extern void run_micro_httpd(struct afb_hsrv *hsrv); + MHD_resume_connection (hreq->connection); + hreq->suspended = 0; + run_micro_httpd(hreq->hsrv); + } } void afb_hreq_reply(struct afb_hreq *hreq, unsigned status, struct MHD_Response *response, ...) diff --git a/src/afb-hreq.h b/src/afb-hreq.h index 76d1d430..a8df015e 100644 --- a/src/afb-hreq.h +++ b/src/afb-hreq.h @@ -20,11 +20,17 @@ struct AFB_clientCtx; struct json_object; struct hreq_data; +struct afb_hsrv; struct afb_hreq { + struct afb_hsrv *hsrv; const char *cacheTimeout; struct MHD_Connection *connection; int method; + int reqid; + int scanned; + int suspended; + int replied; const char *version; const char *url; size_t lenurl; diff --git a/src/afb-hsrv.c b/src/afb-hsrv.c index f200a960..a8338251 100644 --- a/src/afb-hsrv.c +++ b/src/afb-hsrv.c @@ -59,9 +59,11 @@ struct afb_hsrv { struct hsrv_handler *handlers; struct MHD_Daemon *httpd; struct upoll *upoll; + int in_run; char *cache_to; }; +static int global_reqids = 0; static void reply_error(struct MHD_Connection *connection, unsigned int status) { @@ -107,25 +109,34 @@ static int access_handler( hsrv = cls; hreq = *recordreq; if (hreq == NULL) { - /* create the request */ - hreq = calloc(1, sizeof *hreq); - if (hreq == NULL) - goto internal_error; - *recordreq = hreq; - /* get the method */ method = get_method(methodstr); method &= afb_method_get | afb_method_post; - if (method == afb_method_none) - goto bad_request; + if (method == afb_method_none) { + reply_error(connection, MHD_HTTP_BAD_REQUEST); + return MHD_YES; + } + + /* create the request */ + hreq = calloc(1, sizeof *hreq); + if (hreq == NULL) { + reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR); + return MHD_YES; + } /* init the request */ + hreq->hsrv = hsrv; hreq->cacheTimeout = hsrv->cache_to; + hreq->reqid = ++global_reqids; + hreq->scanned = 0; + hreq->suspended = 0; + hreq->replied = 0; hreq->connection = connection; hreq->method = method; hreq->version = version; hreq->tail = hreq->url = url; hreq->lentail = hreq->lenurl = strlen(url); + *recordreq = hreq; /* init the post processing */ if (method == afb_method_post) { @@ -136,10 +147,10 @@ static int access_handler( } else if (strcasestr(type, FORM_CONTENT) != NULL) { hreq->postform = MHD_create_post_processor (connection, 65500, postproc, hreq); if (hreq->postform == NULL) - goto internal_error; + afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR); return MHD_YES; } else if (strcasestr(type, JSON_CONTENT) == NULL) { - reply_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE); + afb_hreq_reply_error(hreq, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE); return MHD_YES; } } @@ -148,30 +159,50 @@ static int access_handler( /* process further data */ if (*upload_data_size) { if (hreq->postform != NULL) { - if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size)) - goto internal_error; + if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size)) { + afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR); + return MHD_YES; + } } else { - if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size)) - goto internal_error; + if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size)) { + afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR); + return MHD_YES; + } } *upload_data_size = 0; - return MHD_YES; + return MHD_YES; } /* flush the data */ if (hreq->postform != NULL) { rc = MHD_destroy_post_processor(hreq->postform); hreq->postform = NULL; - if (rc == MHD_NO) - goto bad_request; + if (rc == MHD_NO) { + afb_hreq_reply_error(hreq, MHD_HTTP_BAD_REQUEST); + return MHD_YES; + } + } + + if (hreq->scanned != 0) { + if (hreq->replied == 0 && hreq->suspended == 0) { + MHD_suspend_connection (connection); + hreq->suspended = 1; + } + return MHD_YES; } /* search an handler for the request */ + hreq->scanned = 1; iter = hsrv->handlers; while (iter) { if (afb_hreq_unprefix(hreq, iter->prefix, iter->length)) { - if (iter->handler(hreq, iter->data)) + if (iter->handler(hreq, iter->data)) { + if (hreq->replied == 0 && hreq->suspended == 0) { + MHD_suspend_connection (connection); + hreq->suspended = 1; + } return MHD_YES; + } hreq->tail = hreq->url; hreq->lentail = hreq->lenurl; } @@ -181,14 +212,6 @@ static int access_handler( /* no handler */ afb_hreq_reply_error(hreq, MHD_HTTP_NOT_FOUND); return MHD_YES; - -bad_request: - reply_error(connection, MHD_HTTP_BAD_REQUEST); - return MHD_YES; - -internal_error: - reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR); - return MHD_YES; } /* Because of POST call multiple time requestApi we need to free POST handle here */ @@ -203,11 +226,19 @@ static void end_handler(void *cls, struct MHD_Connection *connection, void **rec afb_hreq_free(hreq); } -static void handle_epoll_readable(struct afb_hsrv *hsrv) +void run_micro_httpd(struct afb_hsrv *hsrv) { - upoll_on_readable(hsrv->upoll, NULL); - MHD_run(hsrv->httpd); - upoll_on_readable(hsrv->upoll, (void*)handle_epoll_readable); + if (hsrv->in_run != 0) + hsrv->in_run = 2; + else { + upoll_on_readable(hsrv->upoll, NULL); + do { + hsrv->in_run = 1; + MHD_run(hsrv->httpd); + } while(hsrv->in_run == 2); + hsrv->in_run = 0; + upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd); + } }; static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen) @@ -360,7 +391,7 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection fprintf(stderr, "Error: connection to upoll of httpd failed"); return 0; } - upoll_on_readable(upoll, (void*)handle_epoll_readable); + upoll_on_readable(upoll, (void*)run_micro_httpd); hsrv->httpd = httpd; hsrv->upoll = upoll; diff --git a/src/afb-hsrv.h b/src/afb-hsrv.h index b422df47..b328df15 100644 --- a/src/afb-hsrv.h +++ b/src/afb-hsrv.h @@ -24,7 +24,6 @@ struct afb_hreq; extern struct afb_hsrv *afb_hsrv_create(); extern void afb_hsrv_put(struct afb_hsrv *hsrv); - extern void afb_hsrv_stop(struct afb_hsrv *hsrv); extern int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout); extern int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration); -- 2.16.6