afs-supervisor: Add discovery mechanic
[src/app-framework-binder.git] / src / afb-supervision.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define AFB_BINDING_PRAGMA_NO_VERBOSE_MACRO
20
21 #include <string.h>
22 #include <errno.h>
23 #include <signal.h>
24 #include <unistd.h>
25 #include <fcntl.h>
26 #include <pthread.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <sys/un.h>
30
31 #include <json-c/json.h>
32 #include <afb/afb-binding-v2.h>
33
34 #include "afb-cred.h"
35 #include "afb-api.h"
36 #include "afb-apiset.h"
37 #include "afb-api-so-v2.h"
38 #include "afb-xreq.h"
39 #include "afb-trace.h"
40 #include "afb-session.h"
41 #include "afb-config.h"
42 #include "afb-supervision.h"
43 #include "afs-supervision.h"
44 #include "afb-stub-ws.h"
45 #include "afb-debug.h"
46 #include "verbose.h"
47 #include "wrap-json.h"
48 #include "jobs.h"
49
50 extern struct afb_config *main_config;
51
52 /* api and apiset name */
53 static const char supervision_apiname[] = AFS_SURPERVISION_APINAME_INTERNAL;
54
55 /* path of the supervision socket */
56 static const char supervisor_socket_path[] = AFS_SURPERVISION_SOCKET;
57
58 /* mutual exclusion */
59 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
60
61 /* the standard apiset */
62 extern struct afb_apiset *main_apiset;
63
64 /* the supervision apiset (not exported) */
65 static struct afb_apiset *supervision_apiset;
66
67 /* local api implementation */
68 static void on_supervision_call(void *closure, struct afb_xreq *xreq);
69 static struct afb_api_itf supervision_api_itf =
70 {
71         .call = on_supervision_call
72 };
73
74 /* the supervisor link */
75 static struct afb_stub_ws *supervisor;
76
77 /* the trace api */
78 static struct afb_trace *trace;
79
80 /* open the socket */
81 static int open_supervisor_socket(const char *path)
82 {
83         int fd, rc;
84         struct sockaddr_un addr;
85         size_t length;
86
87         /* check path length */
88         length = strlen(path);
89         if (length >= 108) {
90                 errno = ENAMETOOLONG;
91                 return -1;
92         }
93
94         /* create the unix socket */
95         fd = socket(AF_UNIX, SOCK_STREAM, 0);
96         if (fd < 0)
97                 return fd;
98
99         /* prepare the connection address */
100         memset(&addr, 0, sizeof addr);
101         addr.sun_family = AF_UNIX;
102         strcpy(addr.sun_path, path);
103         if (addr.sun_path[0] == '@')
104                 addr.sun_path[0] = 0; /* implement abstract sockets */
105
106         /* connect the socket */
107         rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
108         if (rc < 0) {
109                 close(fd);
110                 return rc;
111         }
112         return fd;
113 }
114
115 static void disconnect_supervisor()
116 {
117         struct afb_stub_ws *s;
118         struct afb_trace *t;
119
120         INFO("Disconnecting supervision");
121         s = __atomic_exchange_n(&supervisor, NULL, __ATOMIC_RELAXED);
122         t = __atomic_exchange_n(&trace, NULL, __ATOMIC_RELAXED);
123         if (s)
124                 afb_stub_ws_unref(s);
125         if (t)
126                 afb_trace_unref(t);
127 }
128
129 static void on_supervisor_hangup(struct afb_stub_ws *s)
130 {
131         if (s && s == supervisor)
132                 disconnect_supervisor();
133 }
134
135 /* try to connect to supervisor */
136 static void try_connect_supervisor()
137 {
138         int fd;
139         ssize_t srd;
140         struct afs_supervision_initiator initiator;
141
142         /* get the mutex */
143         pthread_mutex_lock(&mutex);
144
145         /* needs to connect? */
146         if (supervisor || !supervision_apiset)
147                 goto end;
148
149         /* check existing path */
150         if (supervisor_socket_path[0] != '@' && access(supervisor_socket_path, F_OK)) {
151                 NOTICE("Can't acces socket path %s: %m", supervisor_socket_path);
152                 goto end;
153         }
154
155         /* socket connection */
156         fd = open_supervisor_socket(supervisor_socket_path);
157         if (fd < 0) {
158                 NOTICE("Can't connect supervision socket to %s: %m", supervisor_socket_path);
159                 goto end;
160         }
161
162         /* negociation */
163         do { srd = read(fd, &initiator, sizeof initiator); } while(srd < 0 && errno == EINTR);
164         if (srd < 0) {
165                 NOTICE("Can't read supervisor %s: %m", supervisor_socket_path);
166                 goto end2;
167         }
168         if ((size_t)srd != sizeof initiator) {
169                 ERROR("When reading supervisor %s: %m", supervisor_socket_path);
170                 goto end2;
171         }
172         if (strnlen(initiator.interface, sizeof initiator.interface) == sizeof initiator.interface) {
173                 ERROR("Bad interface of supervisor %s", supervisor_socket_path);
174                 goto end2;
175         }
176         if (strcmp(initiator.interface, AFS_SURPERVISION_INTERFACE_1)) {
177                 ERROR("Unknown interface %s for supervisor %s", initiator.interface, supervisor_socket_path);
178                 goto end2;
179         }
180         if (strnlen(initiator.extra, sizeof initiator.extra) == sizeof initiator.extra) {
181                 ERROR("Bad extra of supervisor %s", supervisor_socket_path);
182                 goto end2;
183         }
184
185         /* interprets extras */
186         if (!strcmp(initiator.extra, "CLOSE")) {
187                 INFO("Supervisor asks to CLOSE");
188                 goto end2;
189         }
190         if (!strcmp(initiator.extra, "WAIT")) {
191                 afb_debug_wait("supervisor");
192         }
193         if (!strcmp(initiator.extra, "BREAK")) {
194                 afb_debug_break("supervisor");
195         }
196
197         /* make the supervisor link */
198         supervisor = afb_stub_ws_create_server(fd, supervision_apiname, supervision_apiset);
199         if (!supervisor) {
200                 ERROR("Creation of supervisor failed: %m");
201                 goto end2;
202         }
203         afb_stub_ws_on_hangup(supervisor, on_supervisor_hangup);
204
205         /* successful termination */
206         goto end;
207
208 end2:
209         close(fd);
210 end:
211         pthread_mutex_unlock(&mutex);
212 }
213
214 static void try_connect_supervisor_job(int signum, void *args)
215 {
216         INFO("Try to connect supervisor after SIGHUP");
217         try_connect_supervisor();
218 }
219
220 static void on_sighup(int signum)
221 {
222         INFO("Supervision received a SIGHUP");
223         jobs_queue(NULL, 0, try_connect_supervisor_job, NULL);
224 }
225
226 /**
227  * initalize the supervision
228  */
229 int afb_supervision_init()
230 {
231         int rc;
232         struct sigaction sa;
233
234         /* don't reinit */
235         if (supervision_apiset)
236                 return 0;
237
238         /* create the apiset */
239         supervision_apiset = afb_apiset_create(supervision_apiname, 0);
240         if (!supervision_apiset) {
241                 ERROR("Can't create supervision's apiset");
242                 return -1;
243         }
244
245         /* init the apiset */
246         rc = afb_apiset_add(supervision_apiset, supervision_apiname,
247                         (struct afb_api){ .itf = &supervision_api_itf, .closure = NULL});
248         if (rc < 0) {
249                 ERROR("Can't create supervision's apiset: %m");
250                 afb_apiset_unref(supervision_apiset);
251                 supervision_apiset = NULL;
252                 return rc;
253         }
254
255         /* get SIGHUP */
256         memset(&sa, 0, sizeof sa);
257         sa.sa_handler = on_sighup;
258         rc = sigaction(SIGHUP, &sa, NULL);
259         if (rc < 0)
260                 ERROR("Can't connect supervision to SIGHUP: %m");
261
262         /* connect to supervision */
263         try_connect_supervisor();
264         return 0;
265 }
266
267 /******************************************************************************
268 **** Implementation monitoring verbs
269 ******************************************************************************/
270 static void slist(void *closure, struct afb_session *session)
271 {
272         struct json_object *list = closure;
273         struct json_object *item;
274
275         wrap_json_pack(&item, "{ss}", "token", afb_session_token(session));
276         json_object_object_add(list, afb_session_uuid(session), item);
277 }
278
279 /******************************************************************************
280 **** Implementation monitoring verbs
281 ******************************************************************************/
282
283 static const char *verbs[] = {
284         "break", "config", "do", "exit", "sclose", "slist", "trace", "wait" };
285 enum  {  Break ,  Config ,  Do ,  Exit ,  Sclose ,  Slist ,  Trace ,  Wait  };
286
287
288 static void on_supervision_call(void *closure, struct afb_xreq *xreq)
289 {
290         int i, rc;
291         struct json_object *args, *drop, *add, *sub, *list;
292         const char *api, *verb, *uuid;
293         struct afb_session *session;
294         const struct afb_api *xapi;
295         struct afb_req req;
296
297         /* search the verb */
298         i = (int)(sizeof verbs / sizeof *verbs);
299         while(--i >= 0 && strcasecmp(verbs[i], xreq->request.verb));
300         if (i < 0) {
301                 afb_xreq_fail_unknown_verb(xreq);
302                 return;
303         }
304
305         /* process */
306         args = afb_xreq_json(xreq);
307         switch(i) {
308         case Exit:
309                 i = 0;
310                 if (wrap_json_unpack(args, "i", &i))
311                         wrap_json_unpack(args, "{si}", "code", &i);
312                 ERROR("existing from supervision with code %d -> %d", i, i & 127);
313                 exit(i & 127);
314                 break;
315         case Sclose:
316                 uuid = NULL;
317                 if (wrap_json_unpack(args, "s", &uuid))
318                         wrap_json_unpack(args, "{ss}", "uuid", &uuid);
319                 if (!uuid)
320                         afb_xreq_fail(xreq, "invalid", NULL);
321                 else {
322                         session = afb_session_search(uuid);
323                         if (!session)
324                                 afb_xreq_fail(xreq, "not-found", NULL);
325                         else {
326                                 afb_session_close(session);
327                                 afb_session_unref(session);
328                                 afb_session_purge();
329                                 afb_xreq_success(xreq, NULL, NULL);
330                         }
331                 }
332                 break;
333         case Slist:
334                 list = json_object_new_object();
335                 afb_session_foreach(slist, list);
336                 afb_xreq_success(xreq, list, NULL);
337                 break;
338         case Config:
339                 afb_xreq_success(xreq, afb_config_json(main_config), NULL);
340                 break;
341         case Trace:
342                 if (!trace)
343                         trace = afb_trace_create(supervision_apiname, NULL /* not bound to any session */);
344
345                 req = afb_xreq_unstore((struct afb_stored_req*)xreq);
346                 wrap_json_unpack(args, "{s?o s?o}", "add", &add, "drop", &drop);
347                 if (add) {
348                         rc = afb_trace_add(req, add, trace);
349                         if (rc)
350                                 return;
351                 }
352                 if (drop) {
353                         rc = afb_trace_drop(req, drop, trace);
354                         if (rc)
355                                 return;
356                 }
357                 afb_req_success(req, NULL, NULL);
358                 break;
359         case Do:
360                 sub = NULL;
361                 if (wrap_json_unpack(args, "{ss ss s?o*}", "api", &api, "verb", &verb, "args", &sub))
362                         afb_xreq_fail(xreq, "error", "bad request");
363                 else {
364                         xapi = afb_apiset_lookup_started(main_apiset, api, 1);
365                         if (!xapi)
366                                 afb_xreq_fail_unknown_api(xreq);
367                         else {
368                                 afb_cred_unref(xreq->cred);
369                                 xreq->cred = NULL;
370                                 xreq->request.api = api;
371                                 xreq->request.verb = verb;
372                                 xreq->json = json_object_get(sub);
373                                 xapi->itf->call(xapi->closure, xreq);
374                                 json_object_put(args);
375                         }
376                 }
377                 break;
378         case Wait:
379                 afb_req_success(req, NULL, NULL);
380                 afb_debug_wait("supervisor");
381                 break;
382         case Break:
383                 afb_req_success(req, NULL, NULL);
384                 afb_debug_break("supervisor");
385                 break;
386         }
387 }
388