first add of asynchonous handling
authorJosé Bollo <jose.bollo@iot.bzh>
Wed, 27 Apr 2016 19:25:54 +0000 (21:25 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Wed, 27 Apr 2016 19:30:01 +0000 (21:30 +0200)
Change-Id: Id9159d33937dc23342d32892f77998fb8cef0000
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
include/afb-req-itf.h
plugins/afm-main-plugin/afm-main-plugin.c
src/afb-hreq.c
src/afb-hreq.h
src/afb-hsrv.c
src/afb-hsrv.h

index d4ade2b..df133f5 100644 (file)
@@ -44,6 +44,7 @@ struct afb_req {
        void *req_closure;
        void *ctx_closure;
 };
+
 static inline struct afb_arg afb_req_get(struct afb_req req, const char *name)
 {
        return req.itf->get(req.req_closure, name);
index 553cffc..21e27e8 100644 (file)
@@ -51,6 +51,22 @@ static struct afb_evmgr evmgr;
 
 static struct jbus *jbus;
 
+struct memo
+{
+       struct afb_req request;
+       const char *method;
+};
+
+static struct memo *make_memo(struct afb_req request, const char *method)
+{
+       struct memo *memo = malloc(sizeof *memo);
+       if (memo != NULL) {
+               memo->request = request;
+               memo->method = method;
+       }
+       return memo;
+}
+
 static void application_list_changed(const char *data, void *closure)
 {
        afb_evmgr_push(evmgr, "application-list-changed", NULL);
@@ -78,49 +94,70 @@ static struct json_object *embed(const char *tag, struct json_object *obj)
        return result;
 }
 
-static void embed_call_void(struct afb_req request, const char *method)
+static void embed_call_void_callback(int status, struct json_object *obj, struct memo *memo)
 {
-       struct json_object *obj = jbus_call_sj_sync(jbus, method, "true");
        if (interface->verbosity)
-               fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", method,
+               fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", memo->method,
                        obj ? json_object_to_json_string(obj) : "NULL");
        if (obj == NULL) {
-               afb_req_fail(request, "failed", "framework daemon failure");
-               return;
+               afb_req_fail(memo->request, "failed", "framework daemon failure");
+       } else {
+               obj = json_object_get(obj);
+               obj = embed(memo->method, obj);
+               if (obj == NULL) {
+                       afb_req_fail(memo->request, "failed", "framework daemon failure");
+               } else {
+                       afb_req_success(memo->request, obj, NULL);
+               }
        }
-       obj = json_object_get(obj);
-       obj = embed(method, obj);
+       free(memo);
+}
+
+static void embed_call_void(struct afb_req request, const char *method)
+{
+       struct memo *memo = make_memo(request, method);
+       if (memo == NULL)
+               afb_req_fail(request, "failed", "out of memory");
+       else if (jbus_call_sj(jbus, method, "true", (void*)embed_call_void_callback, memo) < 0) {
+               afb_req_fail(request, "failed", "dbus failure");
+               free(memo);
+       }
+}
+
+static void call_appid_callback(int status, struct json_object *obj, struct memo *memo)
+{
+       if (interface->verbosity)
+               fprintf(stderr, "(afm-main-plugin) %s -> %s\n", memo->method, 
+                       obj ? json_object_to_json_string(obj) : "NULL");
        if (obj == NULL) {
-               afb_req_fail(request, "failed", "framework daemon failure");
-               return;
+               afb_req_fail(memo->request, "failed", "framework daemon failure");
+       } else {
+               obj = json_object_get(obj);
+               afb_req_success(memo->request, obj, NULL);
        }
-       afb_req_success(request, obj, NULL);
+       free(memo);
 }
 
 static void call_appid(struct afb_req request, const char *method)
 {
-       struct json_object *obj;
+       struct memo *memo;
        char *sid;
        const char *id = afb_req_value(request, _id_);
        if (id == NULL) {
                afb_req_fail(request, "bad-request", "missing 'id'");
                return;
        }
-       if (asprintf(&sid, "\"%s\"", id) <= 0) {
+       memo = make_memo(request, method);
+       if (asprintf(&sid, "\"%s\"", id) <= 0 || memo == NULL) {
                afb_req_fail(request, "server-error", "out of memory");
+               free(memo);
                return;
        }
-       obj = jbus_call_sj_sync(jbus, method, sid);
-       if (interface->verbosity)
-               fprintf(stderr, "(afm-main-plugin) %s(%s) -> %s\n", method, sid,
-                       obj ? json_object_to_json_string(obj) : "NULL");
-       free(sid);
-       if (obj == NULL) {
-               afb_req_fail(request, "failed", "framework daemon failure");
-               return;
+       if (jbus_call_sj(jbus, method, sid, (void*)call_appid_callback, memo) < 0) {
+               afb_req_fail(request, "failed", "dbus failure");
+               free(memo);
        }
-       obj = json_object_get(obj);
-       afb_req_success(request, obj, NULL);
+       free(sid);
 }
 
 static void call_runid(struct afb_req request, const char *method)
@@ -143,7 +180,6 @@ static void call_runid(struct afb_req request, const char *method)
        afb_req_success(request, obj, NULL);
 }
 
-
 /************************** entries ******************************/
 
 static void runnables(struct afb_req request)
@@ -250,7 +286,6 @@ static void install(struct afb_req request)
 
        obj = jbus_call_sj_sync(jbus, _install_, query);
        if (interface->verbosity)
-;
                fprintf(stderr, "(afm-main-plugin) install(%s) -> %s\n", query,
                        obj ? json_object_to_json_string(obj) : "NULL");
        free(query);
index c90b237..6b7d64b 100644 (file)
@@ -152,6 +152,10 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_
 {
        char *cookie;
        const char *k, *v;
+
+       if (hreq->replied != 0)
+               return;
+
        k = va_arg(args, const char *);
        while (k != NULL) {
                v = va_arg(args, const char *);
@@ -164,6 +168,14 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_
        }
        MHD_queue_response(hreq->connection, status, response);
        MHD_destroy_response(response);
+
+       hreq->replied = 1;
+       if (hreq->suspended != 0) {
+               extern void run_micro_httpd(struct afb_hsrv *hsrv);
+               MHD_resume_connection (hreq->connection);
+               hreq->suspended = 0;
+               run_micro_httpd(hreq->hsrv);
+       }
 }
 
 void afb_hreq_reply(struct afb_hreq *hreq, unsigned status, struct MHD_Response *response, ...)
index 76d1d43..a8df015 100644 (file)
 struct AFB_clientCtx;
 struct json_object;
 struct hreq_data;
+struct afb_hsrv;
 
 struct afb_hreq {
+       struct afb_hsrv *hsrv;
        const char *cacheTimeout;
        struct MHD_Connection *connection;
        int method;
+       int reqid;
+       int scanned;
+       int suspended;
+       int replied;
        const char *version;
        const char *url;
        size_t lenurl;
index f200a96..a833825 100644 (file)
@@ -59,9 +59,11 @@ struct afb_hsrv {
        struct hsrv_handler *handlers;
        struct MHD_Daemon *httpd;
        struct upoll *upoll;
+       int in_run;
        char *cache_to;
 };
 
+static int global_reqids = 0;
 
 static void reply_error(struct MHD_Connection *connection, unsigned int status)
 {
@@ -107,25 +109,34 @@ static int access_handler(
        hsrv = cls;
        hreq = *recordreq;
        if (hreq == NULL) {
-               /* create the request */
-               hreq = calloc(1, sizeof *hreq);
-               if (hreq == NULL)
-                       goto internal_error;
-               *recordreq = hreq;
-
                /* get the method */
                method = get_method(methodstr);
                method &= afb_method_get | afb_method_post;
-               if (method == afb_method_none)
-                       goto bad_request;
+               if (method == afb_method_none) {
+                       reply_error(connection, MHD_HTTP_BAD_REQUEST);
+                       return MHD_YES;
+               }
+
+               /* create the request */
+               hreq = calloc(1, sizeof *hreq);
+               if (hreq == NULL) {
+                       reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR);
+                       return MHD_YES;
+               }
 
                /* init the request */
+               hreq->hsrv = hsrv;
                hreq->cacheTimeout = hsrv->cache_to;
+               hreq->reqid = ++global_reqids;
+               hreq->scanned = 0;
+               hreq->suspended = 0;
+               hreq->replied = 0;
                hreq->connection = connection;
                hreq->method = method;
                hreq->version = version;
                hreq->tail = hreq->url = url;
                hreq->lentail = hreq->lenurl = strlen(url);
+               *recordreq = hreq;
 
                /* init the post processing */
                if (method == afb_method_post) {
@@ -136,10 +147,10 @@ static int access_handler(
                        } else if (strcasestr(type, FORM_CONTENT) != NULL) {
                                hreq->postform = MHD_create_post_processor (connection, 65500, postproc, hreq);
                                if (hreq->postform == NULL)
-                                       goto internal_error;
+                                       afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
                                return MHD_YES;
                        } else if (strcasestr(type, JSON_CONTENT) == NULL) {
-                               reply_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE);
+                               afb_hreq_reply_error(hreq, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE);
                                return MHD_YES;
                        }
                }
@@ -148,30 +159,50 @@ static int access_handler(
        /* process further data */
        if (*upload_data_size) {
                if (hreq->postform != NULL) {
-                       if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size))
-                               goto internal_error;
+                       if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size)) {
+                               afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
+                               return MHD_YES;
+                       }
                } else {
-                       if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size))
-                               goto internal_error;
+                       if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size)) {
+                               afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
+                               return MHD_YES;
+                       }
                }
                *upload_data_size = 0;
-               return MHD_YES;         
+               return MHD_YES;
        }
 
        /* flush the data */
        if (hreq->postform != NULL) {
                rc = MHD_destroy_post_processor(hreq->postform);
                hreq->postform = NULL;
-               if (rc == MHD_NO)
-                       goto bad_request;
+               if (rc == MHD_NO) {
+                       afb_hreq_reply_error(hreq, MHD_HTTP_BAD_REQUEST);
+                       return MHD_YES;
+               }
+       }
+
+       if (hreq->scanned != 0) {
+               if (hreq->replied == 0 && hreq->suspended == 0) {
+                       MHD_suspend_connection (connection);
+                       hreq->suspended = 1;
+               }
+               return MHD_YES;
        }
 
        /* search an handler for the request */
+       hreq->scanned = 1;
        iter = hsrv->handlers;
        while (iter) {
                if (afb_hreq_unprefix(hreq, iter->prefix, iter->length)) {
-                       if (iter->handler(hreq, iter->data))
+                       if (iter->handler(hreq, iter->data)) {
+                               if (hreq->replied == 0 && hreq->suspended == 0) {
+                                       MHD_suspend_connection (connection);
+                                       hreq->suspended = 1;
+                               }
                                return MHD_YES;
+                       }
                        hreq->tail = hreq->url;
                        hreq->lentail = hreq->lenurl;
                }
@@ -181,14 +212,6 @@ static int access_handler(
        /* no handler */
        afb_hreq_reply_error(hreq, MHD_HTTP_NOT_FOUND);
        return MHD_YES;
-
-bad_request:
-       reply_error(connection, MHD_HTTP_BAD_REQUEST);
-       return MHD_YES;
-
-internal_error:
-       reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR);
-       return MHD_YES;
 }
 
 /* Because of POST call multiple time requestApi we need to free POST handle here */
@@ -203,11 +226,19 @@ static void end_handler(void *cls, struct MHD_Connection *connection, void **rec
        afb_hreq_free(hreq);
 }
 
-static void handle_epoll_readable(struct afb_hsrv *hsrv)
+void run_micro_httpd(struct afb_hsrv *hsrv)
 {
-       upoll_on_readable(hsrv->upoll, NULL);
-       MHD_run(hsrv->httpd);
-       upoll_on_readable(hsrv->upoll, (void*)handle_epoll_readable);
+       if (hsrv->in_run != 0)
+               hsrv->in_run = 2;
+       else {
+               upoll_on_readable(hsrv->upoll, NULL);
+               do {
+                       hsrv->in_run = 1;
+                       MHD_run(hsrv->httpd);
+               } while(hsrv->in_run == 2);
+               hsrv->in_run = 0;
+               upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd);
+       }
 };
 
 static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen)
@@ -360,7 +391,7 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection
                fprintf(stderr, "Error: connection to upoll of httpd failed");
                return 0;
        }
-       upoll_on_readable(upoll, (void*)handle_epoll_readable);
+       upoll_on_readable(upoll, (void*)run_micro_httpd);
 
        hsrv->httpd = httpd;
        hsrv->upoll = upoll;
index b422df4..b328df1 100644 (file)
@@ -24,7 +24,6 @@ struct afb_hreq;
 extern struct afb_hsrv *afb_hsrv_create();
 extern void afb_hsrv_put(struct afb_hsrv *hsrv);
 
-
 extern void afb_hsrv_stop(struct afb_hsrv *hsrv);
 extern int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout);
 extern int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration);