Avoid sending a pending query and fixes
authorJosé Bollo <jose.bollo@iot.bzh>
Tue, 22 Oct 2019 10:06:41 +0000 (12:06 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Tue, 22 Oct 2019 14:54:27 +0000 (16:54 +0200)
For some reason, dbus requests many times
the same query. That changes try to avoid that
behaviour.

Also fix asynchronous processing and cache handling.

Change-Id: If91631a1ab51ea8530113d015688978b0cf6467a
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/cynagora.c
src/cynagora.h

index aee6a4c..466592d 100644 (file)
 #define CACHESIZE(x)  ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0)
 
 typedef struct asreq asreq_t;
+typedef struct ascb  ascb_t;
 typedef struct agent agent_t;
 typedef struct query query_t;
 
-/** recording of asynchronous requests */
-struct asreq
+/** recording of asynchronous request callbacks */
+struct ascb
 {
-       /** link to the next pending request */
-       asreq_t *next;
+       /** link to the next pending callback */
+       ascb_t *next;
 
        /** callback function */
        cynagora_async_check_cb_t *callback;
 
        /** closure of the callback */
        void *closure;
+};
+
+/** recording of asynchronous requests */
+struct asreq
+{
+       /** link to the next pending request */
+       asreq_t *next;
+
+       /** callbacks */
+       ascb_t *callbacks;
+
+       /** key of the request */
+       cynagora_key_t key;
 
        /** id of the request */
        idgen_t id;
@@ -134,7 +148,7 @@ struct cynagora
        /** the declared agents */
        agent_t *agents;
 
-       /** the pending queries */
+       /** the pending agent queries */
        query_t *queries;
 
        /** id generator */
@@ -161,6 +175,7 @@ struct query
 };
 
 static void agent_ask(cynagora_t *cynagora, int count, const char **fields);
+static int async_reply_process(cynagora_t *cynagora, int count);
 
 /**
  * Flush the write buffer of the client
@@ -334,22 +349,30 @@ get_reply(
        cynagora_t *cynagora
 ) {
        int rc;
+       const char *first;
        uint32_t cacheid;
 
        prot_next(cynagora->prot);
        rc = prot_get(cynagora->prot, &cynagora->reply.fields);
        if (rc > 0) {
-               if (0 == strcmp(cynagora->reply.fields[0], _clear_)) {
+               first = cynagora->reply.fields[0];
+               if (0 == strcmp(first, _clear_)) {
+                       /* clearing the cache */
                        cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0;
                        cache_clear(cynagora->cache, cacheid);
                        rc = 0;
-               } else if (0 == strcmp(cynagora->reply.fields[0], _ask_)) {
+               } else if (0 == strcmp(first, _ask_)) {
                        /* on asking agent */
                        agent_ask(cynagora, rc - 1, &cynagora->reply.fields[1]);
                        rc = 0;
                } else {
-                       if (0 != strcmp(cynagora->reply.fields[0], _item_))
+                       if (0 != strcmp(cynagora->reply.fields[0], _item_)) {
                                cynagora->pending--;
+                               if (strcmp(first, _done_) && strcmp(first, _error_)) {
+                                       if (async_reply_process(cynagora, rc))
+                                               rc = 0;
+                               }
+                       }
                }
        }
        cynagora->reply.count = rc;
@@ -441,6 +464,7 @@ static
 int
 status_check(
        cynagora_t *cynagora,
+       int count,
        time_t *expire
 ) {
        int rc;
@@ -454,7 +478,7 @@ status_check(
        else
                rc = -EPROTO;
 
-       if (cynagora->reply.count < 3)
+       if (count < 3)
                *expire = 0;
        else if (cynagora->reply.fields[2][0] == '-')
                *expire = -1;
@@ -653,11 +677,11 @@ check_or_test(
        if (rc < 0)
                goto end;
 
-       /* ensure there is no clear cache pending */
-       flushr(cynagora);
-
        /* check cache item */
        if (!force) {
+               /* ensure there is no clear cache pending */
+               flushr(cynagora);
+
                rc = cache_search(cynagora->cache, key);
                if (rc >= 0)
                        goto end;
@@ -669,7 +693,7 @@ check_or_test(
                /* get the response */
                rc = wait_pending_reply(cynagora);
                if (rc >= 0) {
-                       rc = status_check(cynagora, &expire);
+                       rc = status_check(cynagora, rc, &expire);
                        if (rc >= 0 && action == _check_)
                                cache_put(cynagora->cache, key, rc, expire, true);
                }
@@ -705,6 +729,37 @@ search_async_request(
        return ar;
 }
 
+static
+int
+async_reply_process(
+       cynagora_t *cynagora,
+       int count
+) {
+       int status;
+       const char *id;
+       asreq_t *ar;
+       ascb_t *ac;
+       time_t expire;
+
+       id = count < 2 ? "" : cynagora->reply.fields[1];
+       ar = search_async_request(cynagora, id, true);
+
+       if (!ar)
+               return 0;
+
+       /* emit the asynchronous answer */
+       status = status_check(cynagora, count, &expire);
+       if (status >= 0)
+               cache_put(cynagora->cache, &ar->key, status, expire, true);
+       while((ac = ar->callbacks) != NULL) {
+               ar->callbacks = ac->next;
+               ac->callback(ac->closure, status);
+               free(ac);
+       }
+       free(ar);
+       return 1;
+}
+
 /******************************************************************************/
 /*** PUBLIC COMMON METHODS                                                  ***/
 /******************************************************************************/
@@ -803,11 +858,16 @@ cynagora_async_setup(
        void *closure
 ) {
        asreq_t *ar;
+       ascb_t *ac;
 
        /* cancel pending requests */
        while((ar = cynagora->async.requests) != NULL) {
                cynagora->async.requests = ar->next;
-               ar->callback(ar->closure, -ECANCELED);
+               while((ac = ar->callbacks) != NULL) {
+                       ar->callbacks = ac->next;
+                       ac->callback(ac->closure, -ECANCELED);
+                       free(ac);
+               }
                free(ar);
        }
 
@@ -828,47 +888,12 @@ cynagora_async_process(
        cynagora_t *cynagora
 ) {
        int rc;
-       const char *first;
-       const char *id;
-       asreq_t *ar;
-       time_t expire;
-       cynagora_key_t key;
 
        for (;;) {
                /* non blocking wait for a reply */
                rc = wait_reply(cynagora, false);
                if (rc < 0)
                        return rc == -EAGAIN ? 0 : rc;
-
-               /* skip empty replies */
-               if (rc == 0)
-                       continue;
-
-               /* skip done/error replies */
-               first = cynagora->reply.fields[0];
-               if (!strcmp(first, _done_)
-                || !strcmp(first, _error_))
-                       continue;
-
-               /* search the request */
-               id = cynagora->reply.count < 2 ? "" : cynagora->reply.fields[1];
-               ar = search_async_request(cynagora, id, true);
-
-               /* ignore unexpected answers */
-               if (ar == NULL)
-                       continue;
-
-               /* emit the asynchronous answer */
-               rc = status_check(cynagora, &expire);
-               if (rc >= 0) {
-                       key.client = (const char*)(ar + 1);
-                       key.session = &key.client[1 + strlen(key.client)];
-                       key.user = &key.session[1 + strlen(key.session)];
-                       key.permission = &key.user[1 + strlen(key.user)];
-                       cache_put(cynagora->cache, &key, rc, expire, true);
-               }
-               ar->callback(ar->closure, rc);
-               free(ar);
        }
 }
 
@@ -895,6 +920,8 @@ cynagora_cache_check(
        cynagora_t *cynagora,
        const cynagora_key_t *key
 ) {
+       /* ensure there is no clear cache pending */
+       flushr(cynagora);
        return cache_search(cynagora->cache, key);
 }
 
@@ -930,17 +957,19 @@ cynagora_async_check(
 ) {
        int rc;
        asreq_t *ar;
+       ascb_t *ac;
+       char *p;
 
        /* ensure connection */
        rc = ensure_opened(cynagora);
        if (rc < 0)
                return rc;
 
-       /* ensure there is no clear cache pending */
-       flushr(cynagora);
-
        /* check cache item */
        if (!force) {
+               /* ensure there is no clear cache pending */
+               flushr(cynagora);
+
                rc = cache_search(cynagora->cache, key);
                if (rc >= 0) {
                        callback(closure, rc);
@@ -948,32 +977,67 @@ cynagora_async_check(
                }
        }
 
-       /* allocate */
+       /* allocates the callback */
+       ac = malloc(sizeof *ac);
+       if (ac == NULL)
+               return -ENOMEM;
+       ac->callback = callback;
+       ac->closure = closure;
+
+       /* search the request */
+       ar = cynagora->async.requests;
+       while (ar && (strcmp(key->client, ar->key.client)
+               || strcmp(key->session, ar->key.session)
+               || strcmp(key->user, ar->key.user)
+               || strcmp(key->permission, ar->key.permission)))
+               ar = ar->next;
+
+       /* a same request is pending, use it */
+       if (ar) {
+               ac->next = ar->callbacks;
+               ar->callbacks = ac;
+               return 0;
+       }
+
+       /* allocate for the request */
        ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
-       if (ar == NULL)
+       if (ar == NULL) {
+               free(ac);
                return -ENOMEM;
+       }
 
        /* init */
+       ac->next = NULL;
+       ar->callbacks = ac;
+       p = (char*)(ar + 1);
+       ar->key.client = p;
+       p = stpcpy(p, key->client) + 1;
+       ar->key.session = p;
+       p = stpcpy(p, key->session) + 1;
+       ar->key.user = p;
+       p = stpcpy(p, key->user) + 1;
+       ar->key.permission = p;
+       stpcpy(p, key->permission);
        do {
                idgen_next(cynagora->idgen);
        } while (search_async_request(cynagora, cynagora->idgen, false));
        strcpy(ar->id, cynagora->idgen);
-       ar->callback = callback;
-       ar->closure = closure;
-       stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
+       ar->next = cynagora->async.requests;
+       cynagora->async.requests = ar;
 
        /* send the request */
        rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0);
-       if (rc >= 0)
-               rc = flushw(cynagora);
        if (rc < 0) {
+               ar = search_async_request(cynagora, ar->id, true);
+               while((ac = ar->callbacks)) {
+                       ar->callbacks = ac->next;
+                       free(ac);
+               }
                free(ar);
                return rc;
        }
 
        /* record the request */
-       ar->next = cynagora->async.requests;
-       cynagora->async.requests = ar;
        return 0;
 }
 
index 363e725..167b9bc 100644 (file)
@@ -257,7 +257,7 @@ cynagora_test(
 );
 
 /**
- * Check the key asynchronousely (async)
+ * Check the key asynchronously (async)
  *
  * @param cynagora  the handler of the client
  * @param key       the key to query