Update copyright dates
[src/app-framework-binder.git] / src / afs-supervisor.c
index 8cf21cb..8291713 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Copyright (C) 2015-2020 "IoT.bzh"
  * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
 #include <errno.h>
 #include <signal.h>
 #include <unistd.h>
-#include <fcntl.h>
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 
-#include <systemd/sd-event.h>
-#include <systemd/sd-daemon.h>
-
-#include <uuid/uuid.h>
 #include <json-c/json.h>
-#include <afb/afb-binding-v2.h>
 
-#include "afs-supervision.h"
-#include "afb-common.h"
-#include "afb-session.h"
+#define AFB_BINDING_VERSION 3
+#include <afb/afb-binding.h>
+
 #include "afb-cred.h"
 #include "afb-stub-ws.h"
 #include "afb-api.h"
 #include "afb-xreq.h"
-#include "afb-api-so-v2.h"
-#include "afb-api-ws.h"
+#include "afb-api-v3.h"
 #include "afb-apiset.h"
-#include "jobs.h"
+#include "afb-fdev.h"
+#include "afb-socket.h"
+
+#include "fdev.h"
 #include "verbose.h"
 #include "wrap-json.h"
 
-extern void afs_discover(const char *pattern, void (*callback)(void *closure, pid_t pid), void *closure);
+#include "afs-supervision.h"
+#include "afs-supervisor.h"
+#include "afs-discover.h"
 
 /* supervised items */
 struct supervised
@@ -65,17 +63,15 @@ struct supervised
 };
 
 /* api and apiset name */
-static const char supervision_apiname[] = AFS_SURPERVISION_APINAME_INTERNAL;
-static const char supervisor_apiname[] = "supervisor";
-
-/* the main apiset */
-struct afb_apiset *main_apiset;
+static const char supervision_apiname[] = AFS_SUPERVISION_APINAME;
+static const char supervisor_apiname[] = AFS_SUPERVISOR_APINAME;
 
 /* the empty apiset */
 static struct afb_apiset *empty_apiset;
 
 /* supervision socket path */
-static const char supervision_socket_path[] = AFS_SURPERVISION_SOCKET;
+static const char supervision_socket_path[] = AFS_SUPERVISION_SOCKET;
+static struct fdev *supervision_fdev;
 
 /* global mutex */
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -83,55 +79,14 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 /* list of supervised daemons */
 static struct supervised *superviseds;
 
-/*************************************************************************************/
-
-static int afb_init_supervision_api();
+/* events */
+static afb_event_t event_add_pid;
+static afb_event_t event_del_pid;
 
 /*************************************************************************************/
 
-/**
- * Creates the supervisor socket for 'path' and return it
- * return -1 in case of failure
- */
-static int create_supervision_socket(const char *path)
-{
-       int fd, rc;
-       struct sockaddr_un addr;
-       size_t length;
-
-       /* check the path's length */
-       length = strlen(path);
-       if (length >= 108) {
-               ERROR("Path name of supervision socket too long: %d", (int)length);
-               errno = ENAMETOOLONG;
-               return -1;
-       }
-
-       /* create a socket */
-       fd = socket(AF_UNIX, SOCK_STREAM, 0);
-       if (fd < 0) {
-               ERROR("Can't create socket: %m");
-               return fd;
-       }
 
-       /* setup the bind to a path */
-       memset(&addr, 0, sizeof addr);
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, path);
-       if (addr.sun_path[0] == '@')
-               addr.sun_path[0] = 0; /* abstract sockets */
-       else
-               unlink(path);
-
-       /* binds the socket to the path */
-       rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
-       if (rc < 0) {
-               ERROR("can't bind socket to %s", path);
-               close(fd);
-               return rc;
-       }
-       return fd;
-}
+/*************************************************************************************/
 
 /**
  * send on 'fd' an initiator with 'command'
@@ -145,7 +100,7 @@ static int send_initiator(int fd, const char *command)
 
        /* set  */
        memset(&asi, 0, sizeof asi);
-       strcpy(asi.interface, AFS_SURPERVISION_INTERFACE_1);
+       strcpy(asi.interface, AFS_SUPERVISION_INTERFACE_1);
        if (command)
                strncpy(asi.extra, command, sizeof asi.extra - 1);
 
@@ -163,7 +118,7 @@ static int send_initiator(int fd, const char *command)
 }
 
 /*
- * checks whether the incomming supervised represented by its creds
+ * checks whether the incoming supervised represented by its creds
  * is to be accepted or not.
  * return 1 if yes or 0 otherwise.
  */
@@ -175,15 +130,27 @@ static int should_accept(struct afb_cred *cred)
 static void on_supervised_hangup(struct afb_stub_ws *stub)
 {
        struct supervised *s, **ps;
+
+       /* Search the supervised of the ws-stub */
        pthread_mutex_lock(&mutex);
        ps = &superviseds;
        while ((s = *ps) && s->stub != stub)
                ps = &s->next;
-       if (s) {
+
+       /* unlink the supervised if found */
+       if (s)
                *ps = s->next;
-               afb_stub_ws_unref(stub);
-       }
        pthread_mutex_unlock(&mutex);
+
+       /* forgive the ws-stub */
+       afb_stub_ws_unref(stub);
+
+       /* forgive the supervised */
+       if (s) {
+               afb_event_push(event_del_pid, json_object_new_int((int)s->cred->pid));
+               afb_cred_unref(s->cred);
+               free(s);
+       }
 }
 
 /*
@@ -193,13 +160,20 @@ static void on_supervised_hangup(struct afb_stub_ws *stub)
 static int make_supervised(int fd, struct afb_cred *cred)
 {
        struct supervised *s;
+       struct fdev *fdev;
 
        s = malloc(sizeof *s);
        if (!s)
                return -1;
 
+       fdev = afb_fdev_create(fd);
+       if (!fdev) {
+               free(s);
+               return -1;
+       }
+
        s->cred = cred;
-       s->stub = afb_stub_ws_create_client(fd, supervision_apiname, empty_apiset);
+       s->stub = afb_stub_ws_create_client(fdev, supervision_apiname, empty_apiset);
        if (!s->stub) {
                free(s);
                return -1;
@@ -208,7 +182,7 @@ static int make_supervised(int fd, struct afb_cred *cred)
        s->next = superviseds;
        superviseds = s;
        pthread_mutex_unlock(&mutex);
-       afb_stub_ws_on_hangup(s->stub, on_supervised_hangup);
+       afb_stub_ws_set_on_hangup(s->stub, on_supervised_hangup);
        return 0;
 }
 
@@ -247,8 +221,10 @@ static void accept_supervision_link(int sock)
                        rc = send_initiator(fd, NULL);
                        if (!rc) {
                                rc = make_supervised(fd, cred);
-                               if (!rc)
+                               if (!rc) {
+                                       afb_event_push(event_add_pid, json_object_new_int((int)cred->pid));
                                        return;
+                               }
                        }
                }
                afb_cred_unref(cred);
@@ -259,15 +235,14 @@ static void accept_supervision_link(int sock)
 /*
  * handle even on server socket
  */
-static int listening(sd_event_source *src, int fd, uint32_t revents, void *closure)
+static void listening(void *closure, uint32_t revents, struct fdev *fdev)
 {
-       if ((revents & EPOLLIN) != 0)
-               accept_supervision_link(fd);
        if ((revents & EPOLLHUP) != 0) {
                ERROR("supervision socket closed");
                exit(1);
        }
-       return 0;
+       if ((revents & EPOLLIN) != 0)
+               accept_supervision_link((int)(intptr_t)closure);
 }
 
 /*
@@ -277,116 +252,42 @@ static void discovered_cb(void *closure, pid_t pid)
        struct supervised *s;
 
        s = supervised_of_pid(pid);
-       if (!s)
+       if (!s) {
+               (*(int*)closure)++;
                kill(pid, SIGHUP);
+       }
 }
 
-static void discover_supervised()
+int afs_supervisor_discover()
 {
-       afs_discover("afb-daemon", discovered_cb, NULL);
+       int n = 0;
+       afs_discover("afb-daemon", discovered_cb, &n);
+       return n;
 }
 
-/**
- * initalize the supervision
- */
-static int init(const char *spec)
-{
-       int rc, fd;
-
-       /* check argument */
-       if (!spec) {
-               ERROR("invalid socket spec");
-               return -1;
-       }
-
-       rc = afb_session_init(100, 600, "");
-       /* TODO check that value */
-
-       /* create the apisets */
-       main_apiset = afb_apiset_create(supervisor_apiname, 0);
-       if (!main_apiset) {
-               ERROR("Can't create supervisor's apiset");
-               return -1;
-       }
-       empty_apiset = afb_apiset_create(supervision_apiname, 0);
-       if (!empty_apiset) {
-               ERROR("Can't create supervision apiset");
-               return -1;
-       }
-
+/*************************************************************************************/
 
-       /* init the main apiset */
-       rc = afb_init_supervision_api();
-       if (rc < 0) {
-               ERROR("Can't create supervision's apiset: %m");
-               return -1;
-       }
+static void f_subscribe(afb_req_t req)
+{
+       struct json_object *args = afb_req_json(req);
+       int revoke, ok;
 
-       /* create the supervision socket */
-       fd = create_supervision_socket(supervision_socket_path);
-       if (fd < 0)
-               return fd;
-
-       /* listen the socket */
-       rc = listen(fd, 5);
-       if (rc < 0) {
-               ERROR("refused to listen on socket");
-               return rc;
-       }
+       revoke = json_object_is_type(args, json_type_boolean)
+               && !json_object_get_boolean(args);
 
-       /* integrate the socket to the loop */
-       rc = sd_event_add_io(afb_common_get_event_loop(),
-                               NULL, fd, EPOLLIN,
-                               listening, NULL);
-       if (rc < 0) {
-               ERROR("handling socket event isn't possible");
-               return rc;
+       ok = 1;
+       if (!revoke) {
+               ok = !afb_req_subscribe(req, event_add_pid)
+                       && !afb_req_subscribe(req, event_del_pid);
        }
-
-       /* adds the server socket */
-       rc = afb_api_ws_add_server(spec, main_apiset);
-       if (rc < 0) {
-               ERROR("can't start the server socket");
-               return -1;
+       if (revoke || !ok) {
+               afb_req_unsubscribe(req, event_add_pid);
+               afb_req_unsubscribe(req, event_del_pid);
        }
-       return 0;
-}
-
-/* start should not be null but */
-static void start(int signum, void *arg)
-{
-       char *xpath = arg;
-       int rc;
-
-       if (signum)
-               exit(1);
-
-       rc = init(xpath);
-       if (rc)
-               exit(1);
-
-       sd_notify(1, "READY=1");
-
-       discover_supervised();
-}
-
-/**
- * initalize the supervision
- */
-int main(int ac, char **av)
-{
-       verbosity = Verbosity_Level_Debug;
-       /* enter job processing */
-       jobs_start(3, 0, 10, start, av[1]);
-       WARNING("hoops returned from jobs_enter! [report bug]");
-       return 1;
+       afb_req_reply(req, NULL, ok ? NULL : "error", NULL);
 }
 
-/*********************************************************************************************************/
-
-static struct afb_binding_data_v2 datav2;
-
-static void f_list(struct afb_req req)
+static void f_list(afb_req_t req)
 {
        char pid[50];
        struct json_object *resu, *item;
@@ -411,79 +312,136 @@ static void f_list(struct afb_req req)
        afb_req_success(req, resu, NULL);
 }
 
-static void propagate(struct afb_req req, const char *verb)
+static void f_discover(afb_req_t req)
+{
+       afs_supervisor_discover();
+       afb_req_success(req, NULL, NULL);
+}
+
+static void propagate(afb_req_t req, const char *verb)
 {
        struct afb_xreq *xreq;
        struct json_object *args, *item;
        struct supervised *s;
-       struct afb_api api;
+       struct afb_api_item api;
        int p;
 
-       xreq = xreq_from_request(req.closure);
+       xreq = xreq_from_req_x2(req);
        args = afb_xreq_json(xreq);
+
+       /* extract the pid */
        if (!json_object_object_get_ex(args, "pid", &item)) {
-               afb_xreq_fail(xreq, "no-pid", NULL);
+               afb_xreq_reply(xreq, NULL, "no-pid", NULL);
                return;
        }
        errno = 0;
        p = json_object_get_int(item);
        if (!p && errno) {
-               afb_xreq_fail(xreq, "bad-pid", NULL);
+               afb_xreq_reply(xreq, NULL, "bad-pid", NULL);
                return;
        }
+
+       /* get supervised of pid */
        s = supervised_of_pid((pid_t)p);
        if (!s) {
-               afb_req_fail(req, "unknown-pid", NULL);
+               afb_req_reply(req, NULL, "unknown-pid", NULL);
                return;
        }
        json_object_object_del(args, "pid");
+
+       /* replace the verb to call if needed */
        if (verb)
-               xreq->request.verb = verb;
+               xreq->request.called_verb = verb;
+
+       /* call it now */
        api = afb_stub_ws_client_api(s->stub);
        api.itf->call(api.closure, xreq);
 }
 
-static void f_do(struct afb_req req)
+static void f_do(afb_req_t req)
 {
        propagate(req, NULL);
 }
 
-static void f_config(struct afb_req req)
+static void f_config(afb_req_t req)
 {
        propagate(req, NULL);
 }
 
-static void f_trace(struct afb_req req)
+static void f_trace(afb_req_t req)
 {
        propagate(req, NULL);
 }
 
-static void f_sessions(struct afb_req req)
+static void f_sessions(afb_req_t req)
 {
        propagate(req, "slist");
 }
 
-static void f_session_close(struct afb_req req)
+static void f_session_close(afb_req_t req)
 {
        propagate(req, "sclose");
 }
 
-static void f_exit(struct afb_req req)
+static void f_exit(afb_req_t req)
 {
        propagate(req, NULL);
+       afb_req_success(req, NULL, NULL);
 }
 
-static void f_debug_wait(struct afb_req req)
+static void f_debug_wait(afb_req_t req)
 {
        propagate(req, "wait");
+       afb_req_success(req, NULL, NULL);
 }
 
-static void f_debug_break(struct afb_req req)
+static void f_debug_break(afb_req_t req)
 {
        propagate(req, "break");
+       afb_req_success(req, NULL, NULL);
+}
+
+/*************************************************************************************/
+
+/**
+ * initialize the supervisor
+ */
+static int init_supervisor(afb_api_t api)
+{
+       event_add_pid = afb_api_make_event(api, "add-pid");
+       if (!afb_event_is_valid(event_add_pid)) {
+               ERROR("Can't create added event");
+               return -1;
+       }
+
+       event_del_pid = afb_api_make_event(api, "del-pid");
+       if (!afb_event_is_valid(event_del_pid)) {
+               ERROR("Can't create deleted event");
+               return -1;
+       }
+
+       /* create an empty set for superviseds */
+       empty_apiset = afb_apiset_create(supervision_apiname, 0);
+       if (!empty_apiset) {
+               ERROR("Can't create supervision apiset");
+               return -1;
+       }
+
+       /* create the supervision socket */
+       supervision_fdev = afb_socket_open_fdev(supervision_socket_path, 1);
+       if (!supervision_fdev)
+               return -1;
+
+       fdev_set_events(supervision_fdev, EPOLLIN);
+       fdev_set_callback(supervision_fdev, listening,
+                         (void*)(intptr_t)fdev_fd(supervision_fdev));
+
+       return 0;
 }
 
-static const struct afb_auth _afb_auths_v2_supervision[] = {
+/*************************************************************************************/
+
+static const struct afb_auth _afb_auths_v2_supervisor[] = {
        /* 0 */
        {
                .type = afb_auth_Permission,
@@ -491,86 +449,102 @@ static const struct afb_auth _afb_auths_v2_supervision[] = {
        }
 };
 
-static const struct afb_verb_v2 _afb_verbs_v2_supervision[] = {
+static const struct afb_verb_v3 _afb_verbs_supervisor[] = {
+    {
+        .verb = "subscribe",
+        .callback = f_subscribe,
+        .auth = &_afb_auths_v2_supervisor[0],
+        .info = NULL,
+        .session = AFB_SESSION_CHECK_X2
+    },
     {
         .verb = "list",
         .callback = f_list,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "config",
         .callback = f_config,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "do",
         .callback = f_do,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "trace",
         .callback = f_trace,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "sessions",
         .callback = f_sessions,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "session-close",
         .callback = f_session_close,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "exit",
         .callback = f_exit,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "debug-wait",
         .callback = f_debug_wait,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     {
         .verb = "debug-break",
         .callback = f_debug_break,
-        .auth = &_afb_auths_v2_supervision[0],
+        .auth = &_afb_auths_v2_supervisor[0],
+        .info = NULL,
+        .session = AFB_SESSION_CHECK_X2
+    },
+    {
+        .verb = "discover",
+        .callback = f_discover,
+        .auth = &_afb_auths_v2_supervisor[0],
         .info = NULL,
-        .session = AFB_SESSION_NONE_V2
+        .session = AFB_SESSION_CHECK_X2
     },
     { .verb = NULL }
 };
 
-static const struct afb_binding_v2 _afb_binding_v2_supervision = {
+static const struct afb_binding_v3 _afb_binding_supervisor = {
     .api = supervisor_apiname,
     .specification = NULL,
     .info = NULL,
-    .verbs = _afb_verbs_v2_supervision,
+    .verbs = _afb_verbs_supervisor,
     .preinit = NULL,
-    .init = NULL,
+    .init = init_supervisor,
     .onevent = NULL,
     .noconcurrency = 0
 };
 
-static int afb_init_supervision_api()
+int afs_supervisor_add(
+               struct afb_apiset *declare_set,
+               struct afb_apiset * call_set)
 {
-       return afb_api_so_v2_add_binding(&_afb_binding_v2_supervision, NULL, main_apiset, &datav2);
+       return -!afb_api_v3_from_binding(&_afb_binding_supervisor, declare_set, call_set);
 }