Implements sub queries of agents
authorJosé Bollo <jose.bollo@iot.bzh>
Thu, 24 Oct 2019 17:24:33 +0000 (19:24 +0200)
committerJose Bollo <jose.bollo@iot.bzh>
Fri, 25 Oct 2019 11:55:28 +0000 (13:55 +0200)
Change-Id: I30e40521d8f8a2694df00a5c9f55adfe748fbd68
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/cyn-protocol.c
src/cyn-protocol.h
src/cyn-server.c
src/cynagora-protocol.txt
src/cynagora.c
src/cynagora.h
src/expire.c
src/main-cynagora-admin.c
src/main-cynagora-agent.c

index f5d4825..d5391ab 100644 (file)
@@ -46,6 +46,7 @@ const char
        _reply_[] = "reply",
        _rollback_[] = "rollback",
        _set_[] = "set",
+       _sub_[] = "sub",
        _test_[] = "test",
        _yes_[] = "yes";
 
index 2c55df9..65a69d1 100644 (file)
@@ -44,6 +44,7 @@ extern const char
        _reply_[],
        _rollback_[],
        _set_[],
+       _sub_[],
        _test_[],
        _yes_[];
 
index 6c0e373..abf37d7 100644 (file)
@@ -406,6 +406,27 @@ checkcb(
        free(check);
 }
 
+/** allocate the check */
+static
+check_t *
+alloccheck(
+       client_t *cli,
+       const char *id,
+       bool ischeck
+) {
+       check_t *check;
+
+       check = malloc(sizeof *check + 1 + strlen(id));
+       if (check) {
+               strcpy(check->id, id);
+               check->ischeck = ischeck;
+               check->client = cli;
+               check->next = cli->checks;
+               cli->checks = check;
+       }
+       return check;
+}
+
 /** initiate the check */
 static
 void
@@ -417,17 +438,13 @@ makecheck(
 ) {
        data_key_t key;
        check_t *check;
+       const char *id;
 
-       check = malloc(sizeof *check + 1 + strlen(args[1]));
+       id = args[1];
+       check = alloccheck(cli, id, ischeck);
        if (!check)
-               replycheck(cli, args[1], NULL, ischeck);
+               replycheck(cli, id, NULL, ischeck);
        else {
-               strcpy(check->id, args[1]);
-               check->ischeck = ischeck;
-               check->client = cli;
-               check->next = cli->checks;
-               cli->checks = check;
-
                key.client = args[2];
                key.session = args[3];
                key.user = args[4];
@@ -506,6 +523,7 @@ agentcb(
        return 0;
 }
 
+/* treat the reply to an agent query */
 static
 void
 replycb(
@@ -529,6 +547,38 @@ replycb(
        }
 }
 
+/** initiate the check */
+static
+void
+makesub(
+       client_t *cli,
+       const char *args[]
+) {
+       ask_t *ask;
+       data_key_t key;
+       check_t *check;
+       const char *id;
+       const char *askid;
+
+       id = args[2];
+       askid = args[1];
+       ask = searchask(cli, askid, false);
+       if (ask) {
+               check = alloccheck(cli, id, true);
+               if (check) {
+                       key.client = args[3];
+                       key.session = args[4];
+                       key.user = args[5];
+                       key.permission = args[6];
+
+                       cyn_query_subquery_async(
+                                       ask->query, checkcb, check, &key);
+                       return;
+               }
+       }
+       replycheck(cli, id, NULL, true);
+}
+
 /** handle a request */
 static
 void
@@ -676,6 +726,12 @@ onrequest(
                        send_done_or_error(cli, rc);
                        return;
                }
+               if (ckarg(args[0], _sub_, 1) && count == 7) {
+                       if (cli->type != server_Agent)
+                               break;
+                       makesub(cli, args);
+                       return;
+               }
                break;
        case 't': /* test */
                if (ckarg(args[0], _test_, 1) && count == 6) {
index ebefde8..bf255d9 100644 (file)
@@ -118,12 +118,12 @@ synopsis:
        s->c ask ASKID NAME VALUE CLIENT SESSION USER PERMISSION
        c->s reply ASKID ([yes|no] [always|session|one-time|EXPIRE])
 
-### sub queries (agent):
+### sub check (agent):
 
 synopsis:
 
-       c->s sub ASKID (test|check) ID CLIENT SESSION USER PERMISSION
-       s->c (ack|yes|no) ID [EXPIRE]
+       c->s sub ASKID ID CLIENT SESSION USER PERMISSION
+       s->c (yes|no) ID [EXPIRE]
 
 Notes
 -----
index 2e35876..7f3f001 100644 (file)
@@ -756,6 +756,121 @@ async_reply_process(
        return 1;
 }
 
+static
+int
+async_check(
+       cynagora_t *cynagora,
+       const cynagora_key_t *key,
+       int force,
+       int simple,
+       cynagora_async_check_cb_t *callback,
+       void *closure,
+       const char *askid
+) {
+       int rc;
+       asreq_t *ar;
+       ascb_t *ac;
+       char *p;
+       int nf;
+       const char *fields[8];
+
+       /* ensure connection */
+       rc = ensure_opened(cynagora);
+       if (rc < 0)
+               return rc;
+
+       /* check cache item */
+       if (!force) {
+               /* ensure there is no clear cache pending */
+               flushr(cynagora);
+
+               rc = cache_search(cynagora->cache, key);
+               if (rc >= 0) {
+                       callback(closure, rc);
+                       return 0;
+               }
+       }
+
+       /* allocates the callback */
+       ac = malloc(sizeof *ac);
+       if (ac == NULL)
+               return -ENOMEM;
+       ac->callback = callback;
+       ac->closure = closure;
+
+       /* common request only if not subqueries of agents */
+       if (!askid) {
+               /* search the request */
+               ar = cynagora->async.requests;
+               while (ar && (strcmp(key->client, ar->key.client)
+                       || strcmp(key->session, ar->key.session)
+                       || strcmp(key->user, ar->key.user)
+                       || strcmp(key->permission, ar->key.permission)))
+                       ar = ar->next;
+
+               /* a same request is pending, use it */
+               if (ar) {
+                       ac->next = ar->callbacks;
+                       ar->callbacks = ac;
+                       return 0;
+               }
+       }
+
+       /* allocate for the request */
+       ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
+       if (ar == NULL) {
+               free(ac);
+               return -ENOMEM;
+       }
+
+       /* init */
+       ac->next = NULL;
+       ar->callbacks = ac;
+       p = (char*)(ar + 1);
+       ar->key.client = p;
+       p = stpcpy(p, key->client) + 1;
+       ar->key.session = p;
+       p = stpcpy(p, key->session) + 1;
+       ar->key.user = p;
+       p = stpcpy(p, key->user) + 1;
+       ar->key.permission = p;
+       stpcpy(p, key->permission);
+       do {
+               idgen_next(cynagora->idgen);
+       } while (search_async_request(cynagora, cynagora->idgen, false));
+       strcpy(ar->id, cynagora->idgen);
+       ar->next = cynagora->async.requests;
+       cynagora->async.requests = ar;
+
+       /* send the request */
+       if (askid) {
+               fields[0] = _sub_;
+               fields[1] = askid;
+               nf = 2;
+       } else {
+               fields[0] = simple ? _test_ : _check_;
+               nf = 1;
+       }
+       fields[nf++] = ar->id;
+       fields[nf++] = key->client;
+       fields[nf++] = key->session;
+       fields[nf++] = key->user;
+       fields[nf++] = key->permission;
+       rc = send_reply(cynagora, fields, nf);
+       if (rc < 0) {
+               ar = search_async_request(cynagora, ar->id, true);
+               while((ac = ar->callbacks)) {
+                       ar->callbacks = ac->next;
+                       free(ac);
+               }
+               free(ar);
+               return rc;
+       }
+
+       /* record the request */
+       return 0;
+}
+
 /******************************************************************************/
 /*** PUBLIC COMMON METHODS                                                  ***/
 /******************************************************************************/
@@ -951,90 +1066,7 @@ cynagora_async_check(
        cynagora_async_check_cb_t *callback,
        void *closure
 ) {
-       int rc;
-       asreq_t *ar;
-       ascb_t *ac;
-       char *p;
-
-       /* ensure connection */
-       rc = ensure_opened(cynagora);
-       if (rc < 0)
-               return rc;
-
-       /* check cache item */
-       if (!force) {
-               /* ensure there is no clear cache pending */
-               flushr(cynagora);
-
-               rc = cache_search(cynagora->cache, key);
-               if (rc >= 0) {
-                       callback(closure, rc);
-                       return 0;
-               }
-       }
-
-       /* allocates the callback */
-       ac = malloc(sizeof *ac);
-       if (ac == NULL)
-               return -ENOMEM;
-       ac->callback = callback;
-       ac->closure = closure;
-
-       /* search the request */
-       ar = cynagora->async.requests;
-       while (ar && (strcmp(key->client, ar->key.client)
-               || strcmp(key->session, ar->key.session)
-               || strcmp(key->user, ar->key.user)
-               || strcmp(key->permission, ar->key.permission)))
-               ar = ar->next;
-
-       /* a same request is pending, use it */
-       if (ar) {
-               ac->next = ar->callbacks;
-               ar->callbacks = ac;
-               return 0;
-       }
-
-       /* allocate for the request */
-       ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
-       if (ar == NULL) {
-               free(ac);
-               return -ENOMEM;
-       }
-
-       /* init */
-       ac->next = NULL;
-       ar->callbacks = ac;
-       p = (char*)(ar + 1);
-       ar->key.client = p;
-       p = stpcpy(p, key->client) + 1;
-       ar->key.session = p;
-       p = stpcpy(p, key->session) + 1;
-       ar->key.user = p;
-       p = stpcpy(p, key->user) + 1;
-       ar->key.permission = p;
-       stpcpy(p, key->permission);
-       do {
-               idgen_next(cynagora->idgen);
-       } while (search_async_request(cynagora, cynagora->idgen, false));
-       strcpy(ar->id, cynagora->idgen);
-       ar->next = cynagora->async.requests;
-       cynagora->async.requests = ar;
-
-       /* send the request */
-       rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0);
-       if (rc < 0) {
-               ar = search_async_request(cynagora, ar->id, true);
-               while((ac = ar->callbacks)) {
-                       ar->callbacks = ac->next;
-                       free(ac);
-               }
-               free(ar);
-               return rc;
-       }
-
-       /* record the request */
-       return 0;
+       return async_check(cynagora, key, force, simple, callback, closure, NULL);
 }
 
 /******************************************************************************/
@@ -1465,3 +1497,25 @@ cynagora_agent_reply(
        free(query);
        return rc;
 }
+
+/* see cynagora.h */
+int
+cynagora_agent_subquery_async(
+       cynagora_query_t *_query,
+       const cynagora_key_t *key,
+       int force,
+       cynagora_async_check_cb_t *callback,
+       void *closure
+) {
+       int rc;
+       query_t *query = (query_t*)_query;
+       cynagora_t *cynagora;
+
+       cynagora = query->cynagora;
+       if (!cynagora)
+               rc = -ECANCELED;
+       else
+               rc = async_check(cynagora, key, force, false,
+                                       callback, closure, query->askid);
+       return rc;
+}
index 167b9bc..7ae166a 100644 (file)
@@ -499,3 +499,23 @@ cynagora_agent_reply(
        cynagora_query_t *query,
        cynagora_value_t *value
 );
+
+/**
+ * Check a rule as a sub query of the agent
+ *
+ * @param query the related agent query
+ * @param key the key to check
+ * @param force if true forbids cache check
+ * @param callback the callback to handle the asynchronous reply
+ * @param closure the closure to the callback
+ * @return 0 on success or a negative -errno code
+ */
+extern
+int
+cynagora_agent_subquery_async(
+       cynagora_query_t *query,
+       const cynagora_key_t *key,
+       int force,
+       cynagora_async_check_cb_t *callback,
+       void *closure
+);
index a90a8de..baf9b6b 100644 (file)
@@ -97,7 +97,6 @@ static bool parse_time_spec(const char *txt, time_t *time_out)
        return true;
 }
 
-
 /* see expire.h */
 bool txt2exp(const char *txt, time_t *time_out, bool absolute)
 {
index 210084c..aea022a 100644 (file)
@@ -91,7 +91,7 @@ static
 const char
 help__text[] =
        "\n"
-       "Commands are: list, set, drop, acheck, check, atest, test, cache, clear, quit, log, help\n"
+       "Commands are: list, set, drop, acheck, check, test, stest, cache, clear, quit, log, help\n"
        "Type 'help command' to get help on the command\n"
        "Type 'help expiration' to get help on expirations\n"
        "\n"
@@ -180,12 +180,12 @@ help_check_text[] =
 
 static
 const char
-help_acheck_text[] =
+help_scheck_text[] =
        "\n"
-       "Command: acheck client session user permission\n"
+       "Command: scheck client session user permission\n"
        "\n"
-       "Check asynchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
-       "Same as check but don't wait the answer.\n"
+       "Check synchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
+       "Same as check but wait the answer.\n"
        "\n"
 ;
 
@@ -209,12 +209,12 @@ help_test_text[] =
 
 static
 const char
-help_atest_text[] =
+help_stest_text[] =
        "\n"
-       "Command: atest client session user permission\n"
+       "Command: stest client session user permission\n"
        "\n"
-       "Test asynchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
-       "Same as test but don't wait the answer.\n"
+       "Test synchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
+       "Same as test but wait the answer.\n"
        "\n"
 ;
 
@@ -518,7 +518,7 @@ int do_drop(int ac, char **av)
        return uc;
 }
 
-int do_check(int ac, char **av, int (*f)(cynagora_t*,const cynagora_key_t*,int))
+int do_scheck(int ac, char **av, int (*f)(cynagora_t*,const cynagora_key_t*,int))
 {
        int uc, rc;
 
@@ -556,7 +556,7 @@ int do_cache_check(int ac, char **av)
        return uc;
 }
 
-void acheck_cb(void *closure, int status)
+void check_cb(void *closure, int status)
 {
        if (status > 0)
                fprintf(stdout, "allowed\n");
@@ -567,14 +567,14 @@ void acheck_cb(void *closure, int status)
        pending--;
 }
 
-int do_acheck(int ac, char **av, bool simple)
+int do_check(int ac, char **av, bool simple)
 {
        int uc, rc;
 
        rc = get_csup(ac, av, &uc, NULL);
        if (rc == 0) {
                pending++;
-               rc = cynagora_async_check(cynagora, &key, 0, simple, acheck_cb, NULL);
+               rc = cynagora_async_check(cynagora, &key, 0, simple, check_cb, NULL);
                if (rc < 0) {
                        fprintf(stderr, "error %s\n", strerror(-rc));
                        pending--;
@@ -615,12 +615,12 @@ int do_help(int ac, char **av)
                fprintf(stdout, "%s", help_drop_text);
        else if (ac > 1 && !strcmp(av[1], "check"))
                fprintf(stdout, "%s", help_check_text);
-       else if (ac > 1 && !strcmp(av[1], "acheck"))
-               fprintf(stdout, "%s", help_acheck_text);
+       else if (ac > 1 && !strcmp(av[1], "scheck"))
+               fprintf(stdout, "%s", help_scheck_text);
        else if (ac > 1 && !strcmp(av[1], "test"))
                fprintf(stdout, "%s", help_test_text);
-       else if (ac > 1 && !strcmp(av[1], "atest"))
-               fprintf(stdout, "%s", help_atest_text);
+       else if (ac > 1 && !strcmp(av[1], "stest"))
+               fprintf(stdout, "%s", help_stest_text);
        else if (ac > 1 && !strcmp(av[1], "cache"))
                fprintf(stdout, "%s", help_cache_text);
        else if (ac > 1 && !strcmp(av[1], "clear"))
@@ -654,17 +654,17 @@ int do_any(int ac, char **av)
        if (!strcmp(av[0], "drop"))
                return do_drop(ac, av);
 
+       if (!strcmp(av[0], "scheck"))
+               return do_scheck(ac, av, cynagora_check);
+
        if (!strcmp(av[0], "check"))
-               return do_check(ac, av, cynagora_check);
+               return do_check(ac, av, 0);
 
-       if (!strcmp(av[0], "acheck"))
-               return do_acheck(ac, av, 0);
+       if (!strcmp(av[0], "stest"))
+               return do_scheck(ac, av, cynagora_test);
 
        if (!strcmp(av[0], "test"))
-               return do_check(ac, av, cynagora_test);
-
-       if (!strcmp(av[0], "atest"))
-               return do_acheck(ac, av, 1);
+               return do_check(ac, av, 1);
 
        if (!strcmp(av[0], "cache"))
                return do_cache_check(ac, av);
index 88ed995..99d7338 100644 (file)
@@ -23,6 +23,7 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include <stdio.h>
+#include <stdarg.h>
 #include <stdlib.h>
 #include <string.h>
 #include <getopt.h>
 #include "cynagora.h"
 #include "expire.h"
 
+#define BUFFER_SIZE     500
+#define ARGUMENT_COUNT   10
+#define QUERY_COUNT      40
+#define SUBQUERY_COUNT   80
+
+#define FD_FOR_STDIN    -1
+#define FD_FOR_CYNAGORA -2
+
+
+#define _LONGHELP_    'H'
 #define _HELP_        'h'
 #define _SOCKET_      's'
 #define _VERSION_     'v'
 
 static
 const char
-shortopts[] = "hps:v";
+shortopts[] = "Hhps:v";
 
 static
 const struct option
 longopts[] = {
+       { "long-help", 0, NULL, _LONGHELP_ },
        { "help", 0, NULL, _HELP_ },
        { "piped", 0, NULL, _PIPED_ },
        { "socket", 1, NULL, _SOCKET_ },
@@ -61,20 +73,66 @@ static
 const char
 helptxt[] =
        "\n"
-       "usage: cynagora-agent [options]... name [program]\n"
+       "usage: cynagora-agent [options]... name [program [args]...]\n"
        "\n"
-       "otpions:\n"
+       "options:\n"
        "       -s, --socket xxx      set the base xxx for sockets\n"
        "       -p, --piped           replace stdin/out by out/in of program"
-       "       -h, --help            print this help and exit\n"
+       "       -h, --help            print short help and exit\n"
+       "       -H, --long-help       print long help and exit\n"
        "       -v, --version         print the version and exit\n"
        "\n"
-       "When program is given, cynagora-agent performs invoke it with\n"
-       "arguments VALUE CLIENT SESSION USER PERMISSION and expect it\n"
-       "to echo the result with optional expiration\n"
-       "Otherwise cynagora-agent echo its requests on one line:\n"
-       "ID VALUE CLIENT SESSION USER PERMISSION and expect to read the\n"
-       "replies: ID RESULT [EXPIRE]\n"
+;
+
+static
+const char
+longhelptxt[] =
+       "When no program is given, cynagora-agent output queries as below\n"
+       "\n"
+       "    ID VALUE CLIENT SESSION USER PERMISSION\n"
+       "\n"
+       "where ID is a numeric identifier, VALUE is the value associated\n"
+       "to the agent and client, session, user and permission are from the\n"
+       "query.\n"
+       "\n"
+       "For the replies, it reads from its input:\n"
+       "\n"
+       "    ID (yes|no) [expire]\n"
+       "\n"
+       "For the sub queries, it reads from its input:\n"
+       "\n"
+       "    ID sub NUM CLIENT SESSION USER PERMISSION\n"
+       "\n"
+       "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+       "\n"
+       "    reply NUM (yes|no)\n"
+       "\n"
+       "When the option --piped is given, the input and output are connected\n"
+       "to the output and input of the given program.\n"
+       "\n"
+       "When program is given but not the option --piped then an instance of\n"
+       "program is invoked for each agent query with predefined environment\n"
+       "variable set: \n"
+       "       - CYAG_VALUE       value associated to the agent\n"
+       "       - CYAG_CLIENT      client of the query\n"
+       "       - CYAG_SESSION     session of the query\n"
+       "       - CYAG_USER        user of the query\n"
+       "       - CYAG_PERMISSION  permission of the query\n"
+       "\n"
+       "The program will reply\n"
+       "\n"
+       "    (yes|no) [expire]\n"
+       "\n"
+       "and then terminates quickly.\n"
+       "It can also ask for sub-queries:\n"
+       "\n"
+       "    sub NUM CLIENT SESSION USER PERMISSION\n"
+       "\n"
+       "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+       "\n"
+       "    reply NUM (yes|no)\n"
+       "\n"
+       "And the program terminates quickly.\n"
        "\n"
 ;
 
@@ -84,50 +142,23 @@ versiontxt[] =
        "cynagora-agent version 1.99.99\n"
 ;
 
-typedef struct {
-       int filled;
-       char buffer[4000];
-} buf_t;
-
-typedef struct {
-       int count;
-       char *args[20];
-} args_t;
-
-typedef struct {
-       int id;
-       cynagora_query_t *query;
-} query_t;
-
-typedef struct {
-       pid_t pid;
-       int id;
-       int io[2];
-} proc_t;
-
 static char *name;
 static char **prog;
 static int piped;
 static cynagora_t *cynagora;
-static int nexid;
 
-static buf_t buffer;
-static query_t queries[200];
-static proc_t procs[200];
+/******************************************************************************/
+/******************************************************************************/
 
-int qidx(int id)
-{
-       int r = sizeof queries / sizeof *queries;
-       while (r-- && queries[r].id != id);
-       return r;
-}
+typedef struct {
+       int filled;
+       char buffer[BUFFER_SIZE];
+} buf_t;
 
-int pidx(pid_t pid)
-{
-       int r = sizeof procs / sizeof *procs;
-       while (r-- && procs[r].pid != pid);
-       return r;
-}
+typedef struct {
+       int count;
+       char *args[ARGUMENT_COUNT];
+} args_t;
 
 int buf_parse(buf_t *buf, args_t *args)
 {
@@ -176,6 +207,7 @@ void buf_unprefix(buf_t *buf, int count)
        }
 }
 
+/* read the 'buf' from 'fd' and call 'fun' if line of args exists */
 void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
 {
        int n;
@@ -196,6 +228,63 @@ void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
        }
 }
 
+/******************************************************************************/
+/******************************************************************************/
+
+typedef struct {
+       cynagora_query_t *query;
+       pid_t pid;
+       int id;
+       int io[2];
+       buf_t buf;
+} query_t;
+
+typedef struct {
+       int me;
+       int id;
+       int num;
+} subq_t;
+
+static query_t queries[QUERY_COUNT];
+static subq_t subqs[SUBQUERY_COUNT];
+
+static int nexid;
+static int nexme;
+static int qx;
+static int efd;
+
+int qidx(int id)
+{
+       int r = sizeof queries / sizeof *queries;
+       while (r-- && queries[r].id != id);
+       return r;
+}
+
+int pidx(pid_t pid)
+{
+       int r = sizeof queries / sizeof *queries;
+       while (r-- && queries[r].pid != pid);
+       return r;
+}
+
+int sidx(int me)
+{
+       int r = sizeof subqs / sizeof *subqs;
+       while (r-- && subqs[r].me != me);
+       return r;
+}
+
+/*
+ * pipes and forks
+ *
+ * for the parent it returns the pid > 0 of the child and in io[0] the input
+ * from child and in io[1] to output to child.
+ *
+ * for the child it return 0 and stdin (0) comes from the prent and stout (1)
+ * outputs to parent (io is not used)
+ *
+ * returns -1 in case of error with errno appropriately
+ */
 pid_t split(int io[2])
 {
        int rc;
@@ -230,95 +319,232 @@ pid_t split(int io[2])
        return pid;
 }
 
-void deadchild(int sig, siginfo_t *info, void *item)
+/* like printf but with fd and sync */
+int emit(int fd, const char *fmt, ...)
 {
-       int i;
-       pid_t pid;
-       buf_t buf;
-       args_t args;
-       ssize_t sz;
+       int n, w, p;
+       va_list ap;
+       char buffer[2000];
 
-       pid = info->si_pid;
-       i = pidx(pid);
-       if (i >= 0) {
-               args.count = 0;
-               sz = read(procs[i].io[0], buf.buffer, sizeof buf.buffer);
-               if (sz > 0) {
-                       buf.filled = (int)sz;
-                       buf_parse(&buf, &args);
-               }
-               if (!args.count) {
-                       args.args[0] = "no";
-                       args.args[1] = "-";
-                       args.count = 2;
+       va_start(ap, fmt);
+       n = vsnprintf(buffer, sizeof buffer, fmt, ap);
+       va_end(ap);
+
+       if (n >= (int)sizeof buffer) {
+               errno = EINVAL;
+               return -1;
+       }
+       p = 0;
+       while (p < n) {
+               w = (int)write(fd, &buffer[p], (size_t)(n - p));
+               if (w > 0)
+                       p += w;
+               else if (errno != EINTR)
+                       return -1;
+       }
+       fsync(fd);
+       return 0;
+
+}
+
+void clear(int id)
+{
+       int r;
+       struct epoll_event e;
+
+       r = sizeof subqs / sizeof *subqs;
+       while (r)
+               if (subqs[--r].id == id)
+                       memset(&subqs[r], 0, sizeof subqs[r]);
+
+       r = sizeof queries / sizeof *queries;
+       while (r)
+               if (queries[--r].id == id) {
+                       if (queries[r].pid) {
+                               memset(&e, 0, sizeof e);
+                               epoll_ctl(efd, EPOLL_CTL_DEL, queries[r].io[0], &e);
+                               close(queries[r].io[0]);
+                               close(queries[r].io[1]);
+                       }
+                       memset(&queries[r], 0, sizeof queries[r]);
                }
-               if (args.count == 1)
-                       printf("%d %s\n", procs[i].id, args.args[0]);
+}
+
+int reply(int q, char *diag, char *expire)
+{
+       cynagora_value_t value;
+       cynagora_query_t *query;
+
+       query = queries[q].query;
+       value.value = strdupa(diag ?: "no");
+       txt2exp(expire ?: "*", &value.expire, true);
+       clear(queries[q].id);
+       return cynagora_agent_reply(query, &value);
+}
+
+void terminate(int id)
+{
+       int q;
+
+       if (id) {
+               q = qidx(id);
+               if (q >= 0)
+                       reply(q, NULL, NULL);
                else
-                       printf("%d %s %s\n", procs[i].id, args.args[0], args.args[1]);
-               fflush(stdout);
-               close(procs[i].io[0]);
-               close(procs[i].io[1]);
-               procs[i].pid = 0;
+                       clear(id);
        }
-       waitpid(pid, NULL, 0);
 }
 
-void onquery(int ac, char **av)
+void on_subquery_reply(void *closure, int status)
 {
-       int i;
-       pid_t pid;
+       int me, s, q, fd;
 
-       i = pidx(0);
-       if (ac == 6 && i >= 0) {
-               procs[i].pid = pid = split(procs[i].io);
-               if (pid >= 0) {
-                       procs[i].id = atoi(av[0]);
-                       if (!pid) {
-                               setenv("CYAG_VALUE", av[1], 1);
-                               setenv("CYAG_CLIENT", av[2], 1);
-                               setenv("CYAG_SESSION", av[3], 1);
-                               setenv("CYAG_USER", av[4], 1);
-                               setenv("CYAG_PERMISSION", av[5], 1);
-                               execvp(prog[0], prog);
-                               fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
-                               exit(1);
-                       }
-                       return;
-                       close(procs[i].io[0]);
-                       close(procs[i].io[1]);
+
+       me = (int)(intptr_t)closure;
+       s = sidx(me);
+       if (s >= 0) {
+               q = qidx(subqs[s].id);
+               if (q > 0) {
+                       fd = prog ? queries[q].io[1] : 1;
+                       emit(fd, "reply %d %s\n", subqs[s].num, status ? "yes" : "no");
                }
-               procs[i].pid = 0;
+               memset(&subqs[s], 0, sizeof subqs[s]);
        }
-       fprintf(stdout, "%s no -\n", av[0]);
+
 }
 
-int runloop()
+int subquery(int q, int num,  char *client, char *session, char *user, char *permission)
+{
+       int rc, me, s;
+       cynagora_key_t key;
+
+       /* get an id */
+       do {
+               me = ++nexme;
+               if (me < 0)
+                       me = nexme = 1;
+       } while (sidx(me) >= 0);
+
+       s = sidx(0);
+       subqs[s].me = me;
+       subqs[s].id = queries[q].id;
+       subqs[s].num = num;
+
+       key.client = client ?: "?";
+       key.session = session ?: "?";
+       key.user = user ?: "?";
+       key.permission = permission ?: "?";
+
+       rc = cynagora_agent_subquery_async(queries[q].query, &key, 0, on_subquery_reply, (void*)(intptr_t)me);
+       return rc;
+}
+
+int launch(int q)
+{
+       int rc;
+       int io[2];
+       pid_t pid;
+
+       pid = split(io);
+       if (pid < 0)
+               rc = -1;
+       else if (pid == 0) {
+               setenv("CYAG_VALUE", queries[q].query->value, 1);
+               setenv("CYAG_CLIENT", queries[q].query->key.client, 1);
+               setenv("CYAG_SESSION", queries[q].query->key.session, 1);
+               setenv("CYAG_USER", queries[q].query->key.user, 1);
+               setenv("CYAG_PERMISSION", queries[q].query->key.permission, 1);
+               execvp(prog[0], prog);
+               emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
+               exit(1);
+       } else {
+               queries[q].pid = pid;
+               queries[q].io[0] = io[0];
+               queries[q].io[1] = io[1];
+               rc = 0;
+       }
+       return rc;
+}
+
+void dispatch(int q, int ac, char **av)
+{
+       if (q < 0)
+               return;
+
+       if (ac < 1 || strcmp(av[0], "sub")) {
+               reply(q, av[0], ac > 1 ? av[1] : NULL);
+       } else {
+               subquery(q, ac > 1 ? atoi(av[1]) : 1,
+                       ac > 2 ? av[2] : NULL,
+                       ac > 3 ? av[3] : NULL,
+                       ac > 4 ? av[4] : NULL,
+                       ac > 5 ? av[5] : NULL);
+       }
+}
+
+void dispatch_direct(int ac, char **av)
+{
+       int q, qid;
+
+       qid = atoi(av[0]);
+       q = qidx(qid);
+       if (q < 0)
+               return;
+
+       dispatch(q, ac - 1, &av[1]);
+}
+
+void dispatch_fork(int ac, char **av)
+{
+       dispatch(qx, ac, av);
+}
+
+void process_fork(int id)
+{
+       int q = qidx(id);
+       if (q >= 0) {
+               qx = q;
+               read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+       }
+}
+
+/* handles death of a child */
+void deadchild(int sig, siginfo_t *info, void *item)
+{
+       pid_t pid;
+
+       if (piped) {
+               exit(info->si_code == CLD_EXITED ? info->si_status : 127);
+               return;
+       }
+
+       pid = info->si_pid;
+/*
+       int q, id;
+
+       q = pidx(pid);
+       if (q >= 0) {
+               id = queries[q].id;
+               qx = q;
+               read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+               terminate(id);
+       }
+*/
+       waitpid(pid, NULL, 0);
+}
+
+int setup_deadchild()
 {
-       struct pollfd pfd;
        struct sigaction sigact;
 
        /* set the signal handler */
        sigact.sa_sigaction = deadchild;
        sigemptyset(&sigact.sa_mask);
        sigact.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
-       sigaction(SIGCHLD, &sigact, NULL);
-
-       pfd.fd = 0;
-       pfd.events = POLLIN;
-       for(;;) {
-               pfd.revents = 0;
-               poll(&pfd, 1, -1);
-               if (pfd.revents & POLLIN)
-                       read_and_dispatch(0, &buffer, onquery);
-               if (pfd.revents & POLLHUP)
-                       break;
-       }
-
-       return 0;
+       return sigaction(SIGCHLD, &sigact, NULL);
 }
 
-int setup_child()
+int run_piped_program()
 {
        int rc, io[2];
        pid_t pid;
@@ -327,6 +553,7 @@ int setup_child()
        pid = split(io);
        if (pid < 0)
                return -1;
+
        if (pid) {
                close(0);
                dup(io[0]);
@@ -338,20 +565,25 @@ int setup_child()
        }
 
        /* run piped if required */
-       if (piped) {
-               rc = execvp(prog[0], prog);
-               fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
-       } else {
-               rc = runloop();
-               if (rc)
-                       fprintf(stderr, "error: can't loop: %s\n", strerror(errno));
-       }
+       rc = execvp(prog[0], prog);
+       emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
        exit(!!rc);
+       return rc;
+}
+
+int async_ctl(void *closure, int op, int fd, uint32_t events)
+{
+       struct epoll_event e;
+       memset(&e, 0, sizeof e);
+       e.events = events;
+       e.data.fd = FD_FOR_CYNAGORA;
+       return epoll_ctl(efd, op, fd, &e);
 }
 
 int agentcb(void *closure, cynagora_query_t *query)
 {
-       int i, id, rc;
+       int q, id, rc;
+       struct epoll_event e;
 
        /* get an id */
        do {
@@ -361,62 +593,35 @@ int agentcb(void *closure, cynagora_query_t *query)
        } while (qidx(id) >= 0);
 
        /* get an index */
-       i = qidx(0);
-       if (i < 0)
+       q = qidx(0);
+       if (q < 0)
                return -ECANCELED;
 
-       queries[i].id = id;
-       queries[i].query = query;
+       queries[q].id = id;
+       queries[q].query = query;
 
        /* compose the value */
-       rc = fprintf(stdout, "%d %s %s %s %s %s\n",
-               id, query->value, query->key.client, query->key.session,
-               query->key.user, query->key.permission);
+       if (prog) {
+               rc = launch(q);
+               if (rc == 0) {
+                       memset(&e, 0, sizeof e);
+                       e.events = EPOLLIN;
+                       e.data.fd = id;
+                       rc = epoll_ctl(efd, EPOLL_CTL_ADD, queries[q].io[0], &e);
+               }
+       } else {
+               rc = emit(1, "%d %s %s %s %s %s\n",
+                       id, query->value, query->key.client, query->key.session,
+                       query->key.user, query->key.permission);
+       }
        if (rc < 0) {
-               queries[i].query = NULL;
-               queries[i].id = 0;
+               clear(id);
                return -ECANCELED;
        }
 
        return 0;
 }
 
-void onreply(int ac, char **av)
-{
-       int i, id;
-       cynagora_value_t value;
-
-       id = atoi(av[0]);
-       i = qidx(id);
-       if (i >= 0) {
-               value.value = "no";
-               value.expire = 0;
-               if (ac > 1)
-                       value.value = av[1];
-               if (ac > 2)
-                       txt2exp(av[2], &value.expire, true);
-               cynagora_agent_reply(queries[i].query, &value);
-               queries[i].query = NULL;
-               queries[i].id = 0;
-       }
-}
-
-int async_ctl(void *closure, int op, int fd, uint32_t events)
-{
-       int *pfd = closure;
-
-       switch(op) {
-       case EPOLL_CTL_ADD:
-       case EPOLL_CTL_MOD:
-               *pfd = fd;
-               break;
-       case EPOLL_CTL_DEL:
-               *pfd = -1;
-               break;
-       }
-       return 0;
-}
-
 int main(int ac, char **av)
 {
        int opt;
@@ -424,8 +629,9 @@ int main(int ac, char **av)
        int help = 0;
        int version = 0;
        int error = 0;
+       struct epoll_event e;
        char *socket = NULL;
-       struct pollfd fds[2];
+       buf_t buffer = { .filled = 0 };
 
        /* scan arguments */
        for (;;) {
@@ -434,8 +640,11 @@ int main(int ac, char **av)
                        break;
 
                switch(opt) {
+               case _LONGHELP_:
+                       help = 2;
+                       break;
                case _HELP_:
-                       help = 1;
+                       help++;
                        break;
                case _PIPED_:
                        piped = 1;
@@ -454,81 +663,111 @@ int main(int ac, char **av)
 
        /* handles help, version, error */
        if (help) {
-               fprintf(stdout, helptxt);
+               printf("%s", helptxt);
+               if (help > 1)
+                       printf("%s", longhelptxt);
                return 0;
        }
        if (version) {
-               fprintf(stdout, versiontxt);
+               printf("%s", versiontxt);
                return 0;
        }
 
        /* check agent name */
        if (optind == ac) {
-               fprintf(stderr, "error: name missing\n");
+               emit(2, "error: name missing\n");
                error = 1;
        } else {
                name = av[optind++];
                if (!cynagora_agent_is_valid_name(name)) {
-                       fprintf(stderr, "error: invalid agent name %s\n", name);
+                       emit(2, "error: invalid agent name %s\n", name);
                        error = 1;
-               } else if (optind == ac) {
-                       prog = NULL;
-               } else {
+               } else if (optind < ac) {
                        prog = &av[optind++];
+               }else if (piped) {
+                       emit(2, "error: piped without program\n");
+                       error = 1;
+               } else {
+                       prog = NULL;
                }
        }
        if (error)
                return 1;
 
+       /* create the polling */
+       efd = epoll_create1(EPOLL_CLOEXEC);
+       if (efd < 0) {
+               emit(2, "error: epoll_create failed, %s\n", strerror(errno));
+               return 1;
+       }
+
        /* initialize server */
        signal(SIGPIPE, SIG_IGN); /* avoid SIGPIPE! */
        rc = cynagora_create(&cynagora, cynagora_Agent, 0, socket);
        if (rc < 0) {
-               fprintf(stderr, "error: initialization failed, %s\n", strerror(-rc));
+               emit(2, "error: initialization failed, %s\n", strerror(-rc));
                return 1;
        }
-       fds[1].fd = -1;
-       rc = cynagora_async_setup(cynagora, async_ctl, &fds[1].fd);
+       rc = cynagora_async_setup(cynagora, async_ctl, NULL);
        if (rc < 0) {
-               fprintf(stderr, "error: asynchronous setup failed, %s\n", strerror(-rc));
+               emit(2, "error: asynchronous setup failed, %s\n", strerror(-rc));
                return 1;
        }
 
        /* create the agent */
        rc = cynagora_agent_create(cynagora, name, agentcb, NULL);
        if (rc < 0) {
-               fprintf(stderr, "error: creation of agent failed, %s\n", strerror(-rc));
+               emit(2, "error: creation of agent failed, %s\n", strerror(-rc));
                return 1;
        }
 
        /* setup piped */
-       if (prog) {
-               rc = setup_child();
+       setup_deadchild();
+       if (piped) {
+               rc = run_piped_program();
                if (rc < 0) {
-                       fprintf(stderr, "error: can't setup child, %s\n", strerror(errno));
+                       emit(2, "error: can't run piped program, %s\n", strerror(errno));
                        return 1;
                }
+               prog = NULL;
        }
 
-       /* setup output */
-       setlinebuf(stdout);
+       /* catch input if needed */
+       if (!prog) {
+               memset(&e, 0, sizeof e);
+               e.events = EPOLLIN;
+               e.data.fd = FD_FOR_STDIN;
+               rc = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &e);
+               if (rc < 0) {
+                       emit(2, "error: set epoll, %s\n", strerror(errno));
+                       return 1;
+               }
+       }
 
-       fcntl(0, F_SETFL, O_NONBLOCK);
-       fds[0].fd = 0;
-       fds[0].events = fds[1].events = POLLIN;
        for(;;) {
-               rc = poll(fds, 2, -1);
-               if (fds[0].revents & POLLIN)
-                       read_and_dispatch(0, &buffer, onreply);
-               if (fds[1].revents & POLLIN) {
-                       rc = cynagora_async_process(cynagora);
-                       if (rc < 0)
-                               fprintf(stderr, "asynchronous processing failed: %s\n", strerror(-rc));
+               rc = epoll_wait(efd, &e, 1, -1);
+               if (rc == 1) {
+                       if (e.events & EPOLLIN) {
+                               if (e.data.fd == FD_FOR_STDIN) {
+                                       read_and_dispatch(0, &buffer, dispatch_direct);
+                               } else if (e.data.fd == FD_FOR_CYNAGORA) {
+                                       rc = cynagora_async_process(cynagora);
+                                       if (rc < 0)
+                                               emit(2, "asynchronous processing failed: %s\n", strerror(-rc));
+                               } else {
+                                       process_fork(e.data.fd);
+                               }
+                       }
+                       if (e.events & EPOLLHUP) {
+                               if (e.data.fd == FD_FOR_STDIN) {
+                                       break;
+                               } else if (e.data.fd == FD_FOR_CYNAGORA) {
+                                       break;
+                               } else {
+                                       terminate(e.data.fd);
+                               }
+                       }
                }
-               if (fds[0].revents & POLLHUP)
-                       break;
-               if (fds[1].revents & POLLHUP)
-                       break;
        }
        return 0;
 }