/*
- * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
* Author José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
#include <errno.h>
#include <signal.h>
#include <unistd.h>
-#include <fcntl.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
-#include <systemd/sd-event.h>
-#include <systemd/sd-daemon.h>
-
-#include <uuid/uuid.h>
#include <json-c/json.h>
-#include <afb/afb-binding-v2.h>
-#include "afs-supervision.h"
-#include "afb-common.h"
-#include "afb-session.h"
+#define AFB_BINDING_VERSION 3
+#include <afb/afb-binding.h>
+
#include "afb-cred.h"
#include "afb-stub-ws.h"
#include "afb-api.h"
#include "afb-xreq.h"
-#include "afb-api-so-v2.h"
-#include "afb-api-ws.h"
+#include "afb-api-v3.h"
#include "afb-apiset.h"
-#include "jobs.h"
+#include "afb-fdev.h"
+
+#include "fdev.h"
#include "verbose.h"
#include "wrap-json.h"
-extern void afs_discover(const char *pattern, void (*callback)(void *closure, pid_t pid), void *closure);
+#include "afs-supervision.h"
+#include "afs-supervisor.h"
+#include "afs-discover.h"
/* supervised items */
struct supervised
};
/* api and apiset name */
-static const char supervision_apiname[] = AFS_SURPERVISION_APINAME_INTERNAL;
-static const char supervisor_apiname[] = "supervisor";
-
-/* the main apiset */
-struct afb_apiset *main_apiset;
+static const char supervision_apiname[] = AFS_SUPERVISION_APINAME;
+static const char supervisor_apiname[] = AFS_SUPERVISOR_APINAME;
/* the empty apiset */
static struct afb_apiset *empty_apiset;
/* supervision socket path */
-static const char supervision_socket_path[] = AFS_SURPERVISION_SOCKET;
+static const char supervision_socket_path[] = AFS_SUPERVISION_SOCKET;
+static struct fdev *supervision_fdev;
/* global mutex */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
/* list of supervised daemons */
static struct supervised *superviseds;
+/* events */
+static afb_event_t event_add_pid;
+static afb_event_t event_del_pid;
+
/*************************************************************************************/
-static int afb_init_supervision_api();
/*************************************************************************************/
/* set */
memset(&asi, 0, sizeof asi);
- strcpy(asi.interface, AFS_SURPERVISION_INTERFACE_1);
+ strcpy(asi.interface, AFS_SUPERVISION_INTERFACE_1);
if (command)
strncpy(asi.extra, command, sizeof asi.extra - 1);
}
/*
- * checks whether the incomming supervised represented by its creds
+ * checks whether the incoming supervised represented by its creds
* is to be accepted or not.
* return 1 if yes or 0 otherwise.
*/
static void on_supervised_hangup(struct afb_stub_ws *stub)
{
struct supervised *s, **ps;
+
+ /* Search the supervised of the ws-stub */
pthread_mutex_lock(&mutex);
ps = &superviseds;
while ((s = *ps) && s->stub != stub)
ps = &s->next;
- if (s) {
+
+ /* unlink the supervised if found */
+ if (s)
*ps = s->next;
- afb_stub_ws_unref(stub);
- }
pthread_mutex_unlock(&mutex);
+
+ /* forgive the ws-stub */
+ afb_stub_ws_unref(stub);
+
+ /* forgive the supervised */
+ if (s) {
+ afb_event_push(event_del_pid, json_object_new_int((int)s->cred->pid));
+ afb_cred_unref(s->cred);
+ free(s);
+ }
}
/*
static int make_supervised(int fd, struct afb_cred *cred)
{
struct supervised *s;
+ struct fdev *fdev;
s = malloc(sizeof *s);
if (!s)
return -1;
+ fdev = afb_fdev_create(fd);
+ if (!fdev) {
+ free(s);
+ return -1;
+ }
+
s->cred = cred;
- s->stub = afb_stub_ws_create_client(fd, supervision_apiname, empty_apiset);
+ s->stub = afb_stub_ws_create_client(fdev, supervision_apiname, empty_apiset);
if (!s->stub) {
free(s);
return -1;
s->next = superviseds;
superviseds = s;
pthread_mutex_unlock(&mutex);
- afb_stub_ws_on_hangup(s->stub, on_supervised_hangup);
+ afb_stub_ws_set_on_hangup(s->stub, on_supervised_hangup);
return 0;
}
rc = send_initiator(fd, NULL);
if (!rc) {
rc = make_supervised(fd, cred);
- if (!rc)
+ if (!rc) {
+ afb_event_push(event_add_pid, json_object_new_int((int)cred->pid));
return;
+ }
}
}
afb_cred_unref(cred);
/*
* handle even on server socket
*/
-static int listening(sd_event_source *src, int fd, uint32_t revents, void *closure)
+static void listening(void *closure, uint32_t revents, struct fdev *fdev)
{
if ((revents & EPOLLIN) != 0)
- accept_supervision_link(fd);
+ accept_supervision_link((int)(intptr_t)closure);
if ((revents & EPOLLHUP) != 0) {
ERROR("supervision socket closed");
exit(1);
}
- return 0;
}
/*
struct supervised *s;
s = supervised_of_pid(pid);
- if (!s)
+ if (!s) {
+ (*(int*)closure)++;
kill(pid, SIGHUP);
+ }
}
-static void discover_supervised()
+int afs_supervisor_discover()
{
- afs_discover("afb-daemon", discovered_cb, NULL);
+ int n = 0;
+ afs_discover("afb-daemon", discovered_cb, &n);
+ return n;
}
-/**
- * initalize the supervision
- */
-static int init(const char *spec)
-{
- int rc, fd;
-
- /* check argument */
- if (!spec) {
- ERROR("invalid socket spec");
- return -1;
- }
-
- rc = afb_session_init(100, 600, "");
- /* TODO check that value */
-
- /* create the apisets */
- main_apiset = afb_apiset_create(supervisor_apiname, 0);
- if (!main_apiset) {
- ERROR("Can't create supervisor's apiset");
- return -1;
- }
- empty_apiset = afb_apiset_create(supervision_apiname, 0);
- if (!empty_apiset) {
- ERROR("Can't create supervision apiset");
- return -1;
- }
-
-
- /* init the main apiset */
- rc = afb_init_supervision_api();
- if (rc < 0) {
- ERROR("Can't create supervision's apiset: %m");
- return -1;
- }
+/*************************************************************************************/
- /* create the supervision socket */
- fd = create_supervision_socket(supervision_socket_path);
- if (fd < 0)
- return fd;
+static void f_subscribe(afb_req_t req)
+{
+ struct json_object *args = afb_req_json(req);
+ int revoke, ok;
- /* listen the socket */
- rc = listen(fd, 5);
- if (rc < 0) {
- ERROR("refused to listen on socket");
- return rc;
- }
+ revoke = json_object_is_type(args, json_type_boolean)
+ && !json_object_get_boolean(args);
- /* integrate the socket to the loop */
- rc = sd_event_add_io(afb_common_get_event_loop(),
- NULL, fd, EPOLLIN,
- listening, NULL);
- if (rc < 0) {
- ERROR("handling socket event isn't possible");
- return rc;
+ ok = 1;
+ if (!revoke) {
+ ok = !afb_req_subscribe(req, event_add_pid)
+ && !afb_req_subscribe(req, event_del_pid);
}
-
- /* adds the server socket */
- rc = afb_api_ws_add_server(spec, main_apiset);
- if (rc < 0) {
- ERROR("can't start the server socket");
- return -1;
+ if (revoke || !ok) {
+ afb_req_unsubscribe(req, event_add_pid);
+ afb_req_unsubscribe(req, event_del_pid);
}
- return 0;
+ afb_req_reply(req, NULL, ok ? NULL : "error", NULL);
}
-/* start should not be null but */
-static void start(int signum, void *arg)
-{
- char *xpath = arg;
- int rc;
-
- if (signum)
- exit(1);
-
- rc = init(xpath);
- if (rc)
- exit(1);
-
- sd_notify(1, "READY=1");
-
- discover_supervised();
-}
-
-/**
- * initalize the supervision
- */
-int main(int ac, char **av)
-{
- verbosity = Verbosity_Level_Debug;
- /* enter job processing */
- jobs_start(3, 0, 10, start, av[1]);
- WARNING("hoops returned from jobs_enter! [report bug]");
- return 1;
-}
-
-/*********************************************************************************************************/
-
-static struct afb_binding_data_v2 datav2;
-
-static void f_list(struct afb_req req)
+static void f_list(afb_req_t req)
{
char pid[50];
struct json_object *resu, *item;
afb_req_success(req, resu, NULL);
}
-static void propagate(struct afb_req req, const char *verb)
+static void f_discover(afb_req_t req)
+{
+ afs_supervisor_discover();
+ afb_req_success(req, NULL, NULL);
+}
+
+static void propagate(afb_req_t req, const char *verb)
{
struct afb_xreq *xreq;
struct json_object *args, *item;
struct supervised *s;
- struct afb_api api;
+ struct afb_api_item api;
int p;
- xreq = xreq_from_request(req.closure);
+ xreq = xreq_from_req_x2(req);
args = afb_xreq_json(xreq);
+
+ /* extract the pid */
if (!json_object_object_get_ex(args, "pid", &item)) {
- afb_xreq_fail(xreq, "no-pid", NULL);
+ afb_xreq_reply(xreq, NULL, "no-pid", NULL);
return;
}
errno = 0;
p = json_object_get_int(item);
if (!p && errno) {
- afb_xreq_fail(xreq, "bad-pid", NULL);
+ afb_xreq_reply(xreq, NULL, "bad-pid", NULL);
return;
}
+
+ /* get supervised of pid */
s = supervised_of_pid((pid_t)p);
if (!s) {
- afb_req_fail(req, "unknown-pid", NULL);
+ afb_req_reply(req, NULL, "unknown-pid", NULL);
return;
}
json_object_object_del(args, "pid");
+
+ /* replace the verb to call if needed */
if (verb)
- xreq->request.verb = verb;
+ xreq->request.called_verb = verb;
+
+ /* call it now */
api = afb_stub_ws_client_api(s->stub);
api.itf->call(api.closure, xreq);
}
-static void f_do(struct afb_req req)
+static void f_do(afb_req_t req)
{
propagate(req, NULL);
}
-static void f_config(struct afb_req req)
+static void f_config(afb_req_t req)
{
propagate(req, NULL);
}
-static void f_trace(struct afb_req req)
+static void f_trace(afb_req_t req)
{
propagate(req, NULL);
}
-static void f_sessions(struct afb_req req)
+static void f_sessions(afb_req_t req)
{
propagate(req, "slist");
}
-static void f_session_close(struct afb_req req)
+static void f_session_close(afb_req_t req)
{
propagate(req, "sclose");
}
-static void f_exit(struct afb_req req)
+static void f_exit(afb_req_t req)
{
propagate(req, NULL);
+ afb_req_success(req, NULL, NULL);
}
-static void f_debug_wait(struct afb_req req)
+static void f_debug_wait(afb_req_t req)
{
propagate(req, "wait");
+ afb_req_success(req, NULL, NULL);
}
-static void f_debug_break(struct afb_req req)
+static void f_debug_break(afb_req_t req)
{
propagate(req, "break");
+ afb_req_success(req, NULL, NULL);
+}
+
+/*************************************************************************************/
+
+/**
+ * initialize the supervisor
+ */
+static int init_supervisor(afb_api_t api)
+{
+ int rc, fd;
+
+ event_add_pid = afb_api_make_event(api, "add-pid");
+ if (!afb_event_is_valid(event_add_pid)) {
+ ERROR("Can't create added event");
+ return -1;
+ }
+
+ event_del_pid = afb_api_make_event(api, "del-pid");
+ if (!afb_event_is_valid(event_del_pid)) {
+ ERROR("Can't create deleted event");
+ return -1;
+ }
+
+ /* create an empty set for superviseds */
+ empty_apiset = afb_apiset_create(supervision_apiname, 0);
+ if (!empty_apiset) {
+ ERROR("Can't create supervision apiset");
+ return -1;
+ }
+
+ /* create the supervision socket */
+ fd = create_supervision_socket(supervision_socket_path);
+ if (fd < 0)
+ return fd;
+
+ /* listen the socket */
+ rc = listen(fd, 5);
+ if (rc < 0) {
+ ERROR("refused to listen on socket");
+ return rc;
+ }
+
+ /* integrate the socket to the loop */
+ supervision_fdev = afb_fdev_create(fd);
+ if (rc < 0) {
+ ERROR("handling socket event isn't possible");
+ return rc;
+ }
+ fdev_set_events(supervision_fdev, EPOLLIN);
+ fdev_set_callback(supervision_fdev, listening, (void*)(intptr_t)fd);
+
+ return 0;
}
-static const struct afb_auth _afb_auths_v2_supervision[] = {
+/*************************************************************************************/
+
+static const struct afb_auth _afb_auths_v2_supervisor[] = {
/* 0 */
{
.type = afb_auth_Permission,
}
};
-static const struct afb_verb_v2 _afb_verbs_v2_supervision[] = {
+static const struct afb_verb_v3 _afb_verbs_supervisor[] = {
+ {
+ .verb = "subscribe",
+ .callback = f_subscribe,
+ .auth = &_afb_auths_v2_supervisor[0],
+ .info = NULL,
+ .session = AFB_SESSION_CHECK_X2
+ },
{
.verb = "list",
.callback = f_list,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "config",
.callback = f_config,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "do",
.callback = f_do,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "trace",
.callback = f_trace,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "sessions",
.callback = f_sessions,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "session-close",
.callback = f_session_close,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "exit",
.callback = f_exit,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "debug-wait",
.callback = f_debug_wait,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{
.verb = "debug-break",
.callback = f_debug_break,
- .auth = &_afb_auths_v2_supervision[0],
+ .auth = &_afb_auths_v2_supervisor[0],
+ .info = NULL,
+ .session = AFB_SESSION_CHECK_X2
+ },
+ {
+ .verb = "discover",
+ .callback = f_discover,
+ .auth = &_afb_auths_v2_supervisor[0],
.info = NULL,
- .session = AFB_SESSION_NONE_V2
+ .session = AFB_SESSION_CHECK_X2
},
{ .verb = NULL }
};
-static const struct afb_binding_v2 _afb_binding_v2_supervision = {
+static const struct afb_binding_v3 _afb_binding_supervisor = {
.api = supervisor_apiname,
.specification = NULL,
.info = NULL,
- .verbs = _afb_verbs_v2_supervision,
+ .verbs = _afb_verbs_supervisor,
.preinit = NULL,
- .init = NULL,
+ .init = init_supervisor,
.onevent = NULL,
.noconcurrency = 0
};
-static int afb_init_supervision_api()
+int afs_supervisor_add(
+ struct afb_apiset *declare_set,
+ struct afb_apiset * call_set)
{
- return afb_api_so_v2_add_binding(&_afb_binding_v2_supervision, NULL, main_apiset, &datav2);
+ return -!afb_api_v3_from_binding(&_afb_binding_supervisor, declare_set, call_set);
}