Fix client disconnection close
[src/app-framework-binder.git] / src / afs-supervisor.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdio.h>
21 #include <string.h>
22 #include <errno.h>
23 #include <signal.h>
24 #include <unistd.h>
25 #include <pthread.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <json-c/json.h>
31
32 #define AFB_BINDING_VERSION 3
33 #include <afb/afb-binding.h>
34
35 #include "afb-cred.h"
36 #include "afb-stub-ws.h"
37 #include "afb-api.h"
38 #include "afb-xreq.h"
39 #include "afb-api-v3.h"
40 #include "afb-apiset.h"
41 #include "afb-fdev.h"
42
43 #include "fdev.h"
44 #include "verbose.h"
45 #include "wrap-json.h"
46
47 #include "afs-supervision.h"
48 #include "afs-supervisor.h"
49 #include "afs-discover.h"
50
51 /* supervised items */
52 struct supervised
53 {
54         /* link to the next supervised */
55         struct supervised *next;
56
57         /* credentials of the supervised */
58         struct afb_cred *cred;
59
60         /* connection with the supervised */
61         struct afb_stub_ws *stub;
62 };
63
64 /* api and apiset name */
65 static const char supervision_apiname[] = AFS_SUPERVISION_APINAME;
66 static const char supervisor_apiname[] = AFS_SUPERVISOR_APINAME;
67
68 /* the empty apiset */
69 static struct afb_apiset *empty_apiset;
70
71 /* supervision socket path */
72 static const char supervision_socket_path[] = AFS_SUPERVISION_SOCKET;
73 static struct fdev *supervision_fdev;
74
75 /* global mutex */
76 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
77
78 /* list of supervised daemons */
79 static struct supervised *superviseds;
80
81 /* events */
82 static afb_event_t event_add_pid;
83 static afb_event_t event_del_pid;
84
85 /*************************************************************************************/
86
87
88 /*************************************************************************************/
89
90 /**
91  * Creates the supervisor socket for 'path' and return it
92  * return -1 in case of failure
93  */
94 static int create_supervision_socket(const char *path)
95 {
96         int fd, rc;
97         struct sockaddr_un addr;
98         size_t length;
99
100         /* check the path's length */
101         length = strlen(path);
102         if (length >= 108) {
103                 ERROR("Path name of supervision socket too long: %d", (int)length);
104                 errno = ENAMETOOLONG;
105                 return -1;
106         }
107
108         /* create a socket */
109         fd = socket(AF_UNIX, SOCK_STREAM, 0);
110         if (fd < 0) {
111                 ERROR("Can't create socket: %m");
112                 return fd;
113         }
114
115         /* setup the bind to a path */
116         memset(&addr, 0, sizeof addr);
117         addr.sun_family = AF_UNIX;
118         strcpy(addr.sun_path, path);
119         if (addr.sun_path[0] == '@')
120                 addr.sun_path[0] = 0; /* abstract sockets */
121         else
122                 unlink(path);
123
124         /* binds the socket to the path */
125         rc = bind(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
126         if (rc < 0) {
127                 ERROR("can't bind socket to %s", path);
128                 close(fd);
129                 return rc;
130         }
131         return fd;
132 }
133
134 /**
135  * send on 'fd' an initiator with 'command'
136  * return 0 on success or -1 on failure
137  */
138 static int send_initiator(int fd, const char *command)
139 {
140         int rc;
141         ssize_t swr;
142         struct afs_supervision_initiator asi;
143
144         /* set  */
145         memset(&asi, 0, sizeof asi);
146         strcpy(asi.interface, AFS_SUPERVISION_INTERFACE_1);
147         if (command)
148                 strncpy(asi.extra, command, sizeof asi.extra - 1);
149
150         /* send the initiator */
151         swr = write(fd, &asi, sizeof asi);
152         if (swr < 0) {
153                 ERROR("Can't send initiator: %m");
154                 rc = -1;
155         } else if (swr < sizeof asi) {
156                 ERROR("Sending incomplete initiator: %m");
157                 rc = -1;
158         } else
159                 rc = 0;
160         return rc;
161 }
162
163 /*
164  * checks whether the incoming supervised represented by its creds
165  * is to be accepted or not.
166  * return 1 if yes or 0 otherwise.
167  */
168 static int should_accept(struct afb_cred *cred)
169 {
170         return cred && cred->pid != getpid(); /* not me! */
171 }
172
173 static void on_supervised_hangup(struct afb_stub_ws *stub)
174 {
175         struct supervised *s, **ps;
176
177         /* Search the supervised of the ws-stub */
178         pthread_mutex_lock(&mutex);
179         ps = &superviseds;
180         while ((s = *ps) && s->stub != stub)
181                 ps = &s->next;
182
183         /* unlink the supervised if found */
184         if (s)
185                 *ps = s->next;
186         pthread_mutex_unlock(&mutex);
187
188         /* forgive the ws-stub */
189         afb_stub_ws_unref(stub);
190
191         /* forgive the supervised */
192         if (s) {
193                 afb_event_push(event_del_pid, json_object_new_int((int)s->cred->pid));
194                 afb_cred_unref(s->cred);
195                 free(s);
196         }
197 }
198
199 /*
200  * create a supervised for socket 'fd' and 'cred'
201  * return 0 in case of success or -1 in case of error
202  */
203 static int make_supervised(int fd, struct afb_cred *cred)
204 {
205         struct supervised *s;
206         struct fdev *fdev;
207
208         s = malloc(sizeof *s);
209         if (!s)
210                 return -1;
211
212         fdev = afb_fdev_create(fd);
213         if (!fdev) {
214                 free(s);
215                 return -1;
216         }
217
218         s->cred = cred;
219         s->stub = afb_stub_ws_create_client(fdev, supervision_apiname, empty_apiset);
220         if (!s->stub) {
221                 free(s);
222                 return -1;
223         }
224         pthread_mutex_lock(&mutex);
225         s->next = superviseds;
226         superviseds = s;
227         pthread_mutex_unlock(&mutex);
228         afb_stub_ws_set_on_hangup(s->stub, on_supervised_hangup);
229         return 0;
230 }
231
232 /**
233  * Search the supervised of 'pid', return it or NULL.
234  */
235 static struct supervised *supervised_of_pid(pid_t pid)
236 {
237         struct supervised *s;
238
239         pthread_mutex_lock(&mutex);
240         s = superviseds;
241         while (s && pid != s->cred->pid)
242                 s = s->next;
243         pthread_mutex_unlock(&mutex);
244
245         return s;
246 }
247
248 /*
249  * handles incoming connection on 'sock'
250  */
251 static void accept_supervision_link(int sock)
252 {
253         int rc, fd;
254         struct sockaddr addr;
255         socklen_t lenaddr;
256         struct afb_cred *cred;
257
258         lenaddr = (socklen_t)sizeof addr;
259         fd = accept(sock, &addr, &lenaddr);
260         if (fd >= 0) {
261                 cred = afb_cred_create_for_socket(fd);
262                 rc = should_accept(cred);
263                 if (rc) {
264                         rc = send_initiator(fd, NULL);
265                         if (!rc) {
266                                 rc = make_supervised(fd, cred);
267                                 if (!rc) {
268                                         afb_event_push(event_add_pid, json_object_new_int((int)cred->pid));
269                                         return;
270                                 }
271                         }
272                 }
273                 afb_cred_unref(cred);
274                 close(fd);
275         }
276 }
277
278 /*
279  * handle even on server socket
280  */
281 static void listening(void *closure, uint32_t revents, struct fdev *fdev)
282 {
283         if ((revents & EPOLLHUP) != 0) {
284                 ERROR("supervision socket closed");
285                 exit(1);
286         }
287         if ((revents & EPOLLIN) != 0)
288                 accept_supervision_link((int)(intptr_t)closure);
289 }
290
291 /*
292  */
293 static void discovered_cb(void *closure, pid_t pid)
294 {
295         struct supervised *s;
296
297         s = supervised_of_pid(pid);
298         if (!s) {
299                 (*(int*)closure)++;
300                 kill(pid, SIGHUP);
301         }
302 }
303
304 int afs_supervisor_discover()
305 {
306         int n = 0;
307         afs_discover("afb-daemon", discovered_cb, &n);
308         return n;
309 }
310
311 /*************************************************************************************/
312
313 static void f_subscribe(afb_req_t req)
314 {
315         struct json_object *args = afb_req_json(req);
316         int revoke, ok;
317
318         revoke = json_object_is_type(args, json_type_boolean)
319                 && !json_object_get_boolean(args);
320
321         ok = 1;
322         if (!revoke) {
323                 ok = !afb_req_subscribe(req, event_add_pid)
324                         && !afb_req_subscribe(req, event_del_pid);
325         }
326         if (revoke || !ok) {
327                 afb_req_unsubscribe(req, event_add_pid);
328                 afb_req_unsubscribe(req, event_del_pid);
329         }
330         afb_req_reply(req, NULL, ok ? NULL : "error", NULL);
331 }
332
333 static void f_list(afb_req_t req)
334 {
335         char pid[50];
336         struct json_object *resu, *item;
337         struct supervised *s;
338
339         resu = json_object_new_object();
340         s = superviseds;
341         while (s) {
342                 sprintf(pid, "%d", (int)s->cred->pid);
343                 item = NULL;
344                 wrap_json_pack(&item, "{si si si ss ss ss}",
345                                 "pid", (int)s->cred->pid,
346                                 "uid", (int)s->cred->uid,
347                                 "gid", (int)s->cred->gid,
348                                 "id", s->cred->id,
349                                 "label", s->cred->label,
350                                 "user", s->cred->user
351                                 );
352                 json_object_object_add(resu, pid, item);
353                 s = s->next;
354         }
355         afb_req_success(req, resu, NULL);
356 }
357
358 static void f_discover(afb_req_t req)
359 {
360         afs_supervisor_discover();
361         afb_req_success(req, NULL, NULL);
362 }
363
364 static void propagate(afb_req_t req, const char *verb)
365 {
366         struct afb_xreq *xreq;
367         struct json_object *args, *item;
368         struct supervised *s;
369         struct afb_api_item api;
370         int p;
371
372         xreq = xreq_from_req_x2(req);
373         args = afb_xreq_json(xreq);
374
375         /* extract the pid */
376         if (!json_object_object_get_ex(args, "pid", &item)) {
377                 afb_xreq_reply(xreq, NULL, "no-pid", NULL);
378                 return;
379         }
380         errno = 0;
381         p = json_object_get_int(item);
382         if (!p && errno) {
383                 afb_xreq_reply(xreq, NULL, "bad-pid", NULL);
384                 return;
385         }
386
387         /* get supervised of pid */
388         s = supervised_of_pid((pid_t)p);
389         if (!s) {
390                 afb_req_reply(req, NULL, "unknown-pid", NULL);
391                 return;
392         }
393         json_object_object_del(args, "pid");
394
395         /* replace the verb to call if needed */
396         if (verb)
397                 xreq->request.called_verb = verb;
398
399         /* call it now */
400         api = afb_stub_ws_client_api(s->stub);
401         api.itf->call(api.closure, xreq);
402 }
403
404 static void f_do(afb_req_t req)
405 {
406         propagate(req, NULL);
407 }
408
409 static void f_config(afb_req_t req)
410 {
411         propagate(req, NULL);
412 }
413
414 static void f_trace(afb_req_t req)
415 {
416         propagate(req, NULL);
417 }
418
419 static void f_sessions(afb_req_t req)
420 {
421         propagate(req, "slist");
422 }
423
424 static void f_session_close(afb_req_t req)
425 {
426         propagate(req, "sclose");
427 }
428
429 static void f_exit(afb_req_t req)
430 {
431         propagate(req, NULL);
432         afb_req_success(req, NULL, NULL);
433 }
434
435 static void f_debug_wait(afb_req_t req)
436 {
437         propagate(req, "wait");
438         afb_req_success(req, NULL, NULL);
439 }
440
441 static void f_debug_break(afb_req_t req)
442 {
443         propagate(req, "break");
444         afb_req_success(req, NULL, NULL);
445 }
446
447 /*************************************************************************************/
448
449 /**
450  * initialize the supervisor
451  */
452 static int init_supervisor(afb_api_t api)
453 {
454         int rc, fd;
455
456         event_add_pid = afb_api_make_event(api, "add-pid");
457         if (!afb_event_is_valid(event_add_pid)) {
458                 ERROR("Can't create added event");
459                 return -1;
460         }
461
462         event_del_pid = afb_api_make_event(api, "del-pid");
463         if (!afb_event_is_valid(event_del_pid)) {
464                 ERROR("Can't create deleted event");
465                 return -1;
466         }
467
468         /* create an empty set for superviseds */
469         empty_apiset = afb_apiset_create(supervision_apiname, 0);
470         if (!empty_apiset) {
471                 ERROR("Can't create supervision apiset");
472                 return -1;
473         }
474
475         /* create the supervision socket */
476         fd = create_supervision_socket(supervision_socket_path);
477         if (fd < 0)
478                 return fd;
479
480         /* listen the socket */
481         rc = listen(fd, 5);
482         if (rc < 0) {
483                 ERROR("refused to listen on socket");
484                 return rc;
485         }
486
487         /* integrate the socket to the loop */
488         supervision_fdev = afb_fdev_create(fd);
489         if (rc < 0) {
490                 ERROR("handling socket event isn't possible");
491                 return rc;
492         }
493         fdev_set_events(supervision_fdev, EPOLLIN);
494         fdev_set_callback(supervision_fdev, listening, (void*)(intptr_t)fd);
495
496         return 0;
497 }
498
499 /*************************************************************************************/
500
501 static const struct afb_auth _afb_auths_v2_supervisor[] = {
502         /* 0 */
503         {
504                 .type = afb_auth_Permission,
505                 .text = "urn:AGL:permission:#supervision:platform:access"
506         }
507 };
508
509 static const struct afb_verb_v3 _afb_verbs_supervisor[] = {
510     {
511         .verb = "subscribe",
512         .callback = f_subscribe,
513         .auth = &_afb_auths_v2_supervisor[0],
514         .info = NULL,
515         .session = AFB_SESSION_CHECK_X2
516     },
517     {
518         .verb = "list",
519         .callback = f_list,
520         .auth = &_afb_auths_v2_supervisor[0],
521         .info = NULL,
522         .session = AFB_SESSION_CHECK_X2
523     },
524     {
525         .verb = "config",
526         .callback = f_config,
527         .auth = &_afb_auths_v2_supervisor[0],
528         .info = NULL,
529         .session = AFB_SESSION_CHECK_X2
530     },
531     {
532         .verb = "do",
533         .callback = f_do,
534         .auth = &_afb_auths_v2_supervisor[0],
535         .info = NULL,
536         .session = AFB_SESSION_CHECK_X2
537     },
538     {
539         .verb = "trace",
540         .callback = f_trace,
541         .auth = &_afb_auths_v2_supervisor[0],
542         .info = NULL,
543         .session = AFB_SESSION_CHECK_X2
544     },
545     {
546         .verb = "sessions",
547         .callback = f_sessions,
548         .auth = &_afb_auths_v2_supervisor[0],
549         .info = NULL,
550         .session = AFB_SESSION_CHECK_X2
551     },
552     {
553         .verb = "session-close",
554         .callback = f_session_close,
555         .auth = &_afb_auths_v2_supervisor[0],
556         .info = NULL,
557         .session = AFB_SESSION_CHECK_X2
558     },
559     {
560         .verb = "exit",
561         .callback = f_exit,
562         .auth = &_afb_auths_v2_supervisor[0],
563         .info = NULL,
564         .session = AFB_SESSION_CHECK_X2
565     },
566     {
567         .verb = "debug-wait",
568         .callback = f_debug_wait,
569         .auth = &_afb_auths_v2_supervisor[0],
570         .info = NULL,
571         .session = AFB_SESSION_CHECK_X2
572     },
573     {
574         .verb = "debug-break",
575         .callback = f_debug_break,
576         .auth = &_afb_auths_v2_supervisor[0],
577         .info = NULL,
578         .session = AFB_SESSION_CHECK_X2
579     },
580     {
581         .verb = "discover",
582         .callback = f_discover,
583         .auth = &_afb_auths_v2_supervisor[0],
584         .info = NULL,
585         .session = AFB_SESSION_CHECK_X2
586     },
587     { .verb = NULL }
588 };
589
590 static const struct afb_binding_v3 _afb_binding_supervisor = {
591     .api = supervisor_apiname,
592     .specification = NULL,
593     .info = NULL,
594     .verbs = _afb_verbs_supervisor,
595     .preinit = NULL,
596     .init = init_supervisor,
597     .onevent = NULL,
598     .noconcurrency = 0
599 };
600
601 int afs_supervisor_add(
602                 struct afb_apiset *declare_set,
603                 struct afb_apiset * call_set)
604 {
605         return -!afb_api_v3_from_binding(&_afb_binding_supervisor, declare_set, call_set);
606 }
607