api-v3: First draft
[src/app-framework-binder.git] / src / afb-stub-ws.c
1 /*
2  * Copyright (C) 2015-2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #define NO_PLUGIN_VERBOSE_MACRO
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <errno.h>
28 #include <endian.h>
29 #include <netdb.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <pthread.h>
34
35 #include <json-c/json.h>
36
37 #include <afb/afb-event-x2.h>
38
39 #include "afb-session.h"
40 #include "afb-cred.h"
41 #include "afb-api.h"
42 #include "afb-apiset.h"
43 #include "afb-proto-ws.h"
44 #include "afb-stub-ws.h"
45 #include "afb-context.h"
46 #include "afb-evt.h"
47 #include "afb-xreq.h"
48 #include "verbose.h"
49 #include "fdev.h"
50 #include "jobs.h"
51
52 struct afb_stub_ws;
53
54
55 /*
56  * structure for recording calls on client side
57  */
58 struct client_call {
59         struct client_call *next;       /* the next call */
60         struct afb_stub_ws *stubws;     /* the stub_ws */
61         struct afb_xreq *xreq;          /* the request handle */
62         uint32_t msgid;                 /* the message identifier */
63 };
64
65 /*
66  * structure for a ws request
67  */
68 struct server_req {
69         struct afb_xreq xreq;           /* the xreq */
70         struct afb_stub_ws *stubws;     /* the client of the request */
71         struct afb_proto_ws_call *call; /* the incoming call */
72 };
73
74 /*
75  * structure for recording events on client side
76  */
77 struct client_event
78 {
79         struct client_event *next;
80         struct afb_event_x2 *event;
81         int id;
82         int refcount;
83 };
84
85 /*
86  * structure for recording describe requests
87  */
88 struct client_describe
89 {
90         struct afb_stub_ws *stubws;
91         struct jobloop *jobloop;
92         struct json_object *result;
93 };
94
95 /*
96  * structure for jobs of describing
97  */
98 struct server_describe
99 {
100         struct afb_stub_ws *stubws;
101         struct afb_proto_ws_describe *describe;
102 };
103
104 /*
105  * structure for recording sessions
106  */
107 struct server_session
108 {
109         struct server_session *next;
110         struct afb_session *session;
111 };
112
113 /******************* stub description for client or servers ******************/
114
115 struct afb_stub_ws
116 {
117         /* count of references */
118         int refcount;
119
120         /* resource control */
121         pthread_mutex_t mutex;
122
123         /* protocol */
124         struct afb_proto_ws *proto;
125
126         /* listener for events (server side) */
127         struct afb_evt_listener *listener;
128
129         /* event replica (client side) */
130         struct client_event *events;
131
132         /* credentials of the client (server side) */
133         struct afb_cred *cred;
134
135         /* sessions (server side) */
136         struct server_session *sessions;
137
138         /* apiset */
139         struct afb_apiset *apiset;
140
141         /* on hangup callback */
142         void (*on_hangup)(struct afb_stub_ws *);
143
144         /* the api name */
145         char apiname[1];
146 };
147
148 /******************* ws request part for server *****************/
149
150 /* decrement the reference count of the request and free/release it on falling to null */
151 static void server_req_destroy_cb(struct afb_xreq *xreq)
152 {
153         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
154
155         afb_context_disconnect(&wreq->xreq.context);
156         afb_cred_unref(wreq->xreq.cred);
157         json_object_put(wreq->xreq.json);
158         afb_proto_ws_call_unref(wreq->call);
159         afb_stub_ws_unref(wreq->stubws);
160         free(wreq);
161 }
162
163 static void server_req_reply_cb(struct afb_xreq *xreq, struct json_object *obj, const char *error, const char *info)
164 {
165         int rc;
166         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
167
168         rc = afb_proto_ws_call_reply(wreq->call, obj, error, info);
169         if (rc < 0)
170                 ERROR("error while sending reply");
171         json_object_put(obj);
172 }
173
174 static int server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event)
175 {
176         int rc;
177         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
178
179         rc = afb_evt_event_x2_add_watch(wreq->stubws->listener, event);
180         if (rc >= 0)
181                 rc = afb_proto_ws_call_subscribe(wreq->call,  afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event));
182         if (rc < 0)
183                 ERROR("error while subscribing event");
184         return rc;
185 }
186
187 static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event)
188 {
189         int rc, rc2;
190         struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
191
192         rc = afb_proto_ws_call_unsubscribe(wreq->call,  afb_evt_event_x2_fullname(event), afb_evt_event_x2_id(event));
193         rc2 = afb_evt_event_x2_remove_watch(wreq->stubws->listener, event);
194         if (rc >= 0 && rc2 < 0)
195                 rc = rc2;
196         if (rc < 0)
197                 ERROR("error while unsubscribing event");
198         return rc;
199 }
200
201 static const struct afb_xreq_query_itf server_req_xreq_itf = {
202         .reply = server_req_reply_cb,
203         .unref = server_req_destroy_cb,
204         .subscribe = server_req_subscribe_cb,
205         .unsubscribe = server_req_unsubscribe_cb
206 };
207
208 /******************* client part **********************************/
209
210 /* search the event */
211 static struct client_event *client_event_search(struct afb_stub_ws *stubws, uint32_t eventid, const char *name)
212 {
213         struct client_event *ev;
214
215         ev = stubws->events;
216         while (ev != NULL && (ev->id != eventid || 0 != strcmp(afb_evt_event_x2_fullname(ev->event), name)))
217                 ev = ev->next;
218
219         return ev;
220 }
221
222 /* on call, propagate it to the ws service */
223 static void client_call_cb(void * closure, struct afb_xreq *xreq)
224 {
225         struct afb_stub_ws *stubws = closure;
226
227         afb_proto_ws_client_call(
228                         stubws->proto,
229                         xreq->request.called_verb,
230                         afb_xreq_json(xreq),
231                         afb_session_uuid(xreq->context.session),
232                         xreq,
233                         xreq_on_behalf_cred_export(xreq));
234         afb_xreq_unhooked_addref(xreq);
235 }
236
237 static void client_on_description_cb(void *closure, struct json_object *data)
238 {
239         struct client_describe *desc = closure;
240
241         desc->result = data;
242         jobs_leave(desc->jobloop);
243 }
244
245 static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
246 {
247         struct client_describe *desc = closure;
248
249         if (signum)
250                 jobs_leave(jobloop);
251         else {
252                 desc->jobloop = jobloop;
253                 afb_proto_ws_client_describe(desc->stubws->proto, client_on_description_cb, desc);
254         }
255 }
256
257 /* get the description */
258 static struct json_object *client_describe_cb(void * closure)
259 {
260         struct client_describe desc;
261
262         /* synchronous job: send the request and wait its result */
263         desc.stubws = closure;
264         desc.result = NULL;
265         jobs_enter(NULL, 0, client_send_describe_cb, &desc);
266         return desc.result;
267 }
268
269 /******************* server part: manage events **********************************/
270
271 static void server_event_add(void *closure, const char *event, int eventid)
272 {
273         struct afb_stub_ws *stubws = closure;
274
275         afb_proto_ws_server_event_create(stubws->proto, event, eventid);
276 }
277
278 static void server_event_remove(void *closure, const char *event, int eventid)
279 {
280         struct afb_stub_ws *stubws = closure;
281
282         afb_proto_ws_server_event_remove(stubws->proto, event, eventid);
283 }
284
285 static void server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
286 {
287         struct afb_stub_ws *stubws = closure;
288
289         afb_proto_ws_server_event_push(stubws->proto, event, eventid, object);
290         json_object_put(object);
291 }
292
293 static void server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
294 {
295         struct afb_stub_ws *stubws = closure;
296
297         afb_proto_ws_server_event_broadcast(stubws->proto, event, object);
298         json_object_put(object);
299 }
300
301 /*****************************************************/
302
303 static void on_reply(void *closure, void *request, struct json_object *object, const char *error, const char *info)
304 {
305         struct afb_xreq *xreq = request;
306
307         afb_xreq_reply(xreq, object, error, info);
308         afb_xreq_unhooked_unref(xreq);
309 }
310
311 static void on_event_create(void *closure, const char *event_name, int event_id)
312 {
313         struct afb_stub_ws *stubws = closure;
314         struct client_event *ev;
315
316         /* check conflicts */
317         ev = client_event_search(stubws, event_id, event_name);
318         if (ev != NULL) {
319                 ev->refcount++;
320                 return;
321         }
322
323         /* no conflict, try to add it */
324         ev = malloc(sizeof *ev);
325         if (ev != NULL) {
326                 ev->event = afb_evt_event_x2_create(event_name);
327                 if (ev->event != NULL) {
328                         ev->refcount = 1;
329                         ev->id = event_id;
330                         ev->next = stubws->events;
331                         stubws->events = ev;
332                         return;
333                 }
334                 free(ev);
335         }
336         ERROR("can't create event %s, out of memory", event_name);
337 }
338
339 static void on_event_remove(void *closure, const char *event_name, int event_id)
340 {
341         struct afb_stub_ws *stubws = closure;
342         struct client_event *ev, **prv;
343
344         /* check conflicts */
345         ev = client_event_search(stubws, event_id, event_name);
346         if (ev == NULL)
347                 return;
348
349         /* decrease the reference count */
350         if (--ev->refcount)
351                 return;
352
353         /* unlinks the event */
354         prv = &stubws->events;
355         while (*prv != ev)
356                 prv = &(*prv)->next;
357         *prv = ev->next;
358
359         /* destroys the event */
360         afb_evt_event_x2_unref(ev->event);
361         free(ev);
362 }
363
364 static void on_event_subscribe(void *closure, void *request, const char *event_name, int event_id)
365 {
366         struct afb_stub_ws *stubws = closure;
367         struct afb_xreq *xreq = request;
368         struct client_event *ev;
369
370         /* check conflicts */
371         ev = client_event_search(stubws, event_id, event_name);
372         if (ev == NULL)
373                 return;
374
375         if (afb_xreq_subscribe(xreq, ev->event) < 0)
376                 ERROR("can't subscribe: %m");
377 }
378
379 static void on_event_unsubscribe(void *closure, void *request, const char *event_name, int event_id)
380 {
381         struct afb_stub_ws *stubws = closure;
382         struct afb_xreq *xreq = request;
383         struct client_event *ev;
384
385         /* check conflicts */
386         ev = client_event_search(stubws, event_id, event_name);
387         if (ev == NULL)
388                 return;
389
390         if (afb_xreq_unsubscribe(xreq, ev->event) < 0)
391                 ERROR("can't unsubscribe: %m");
392 }
393
394 static void on_event_push(void *closure, const char *event_name, int event_id, struct json_object *data)
395 {
396         struct afb_stub_ws *stubws = closure;
397         struct client_event *ev;
398
399         /* check conflicts */
400         ev = client_event_search(stubws, event_id, event_name);
401         if (ev)
402                 afb_evt_event_x2_push(ev->event, data);
403         else
404                 ERROR("unreadable push event");
405 }
406
407 static void on_event_broadcast(void *closure, const char *event_name, struct json_object *data)
408 {
409         afb_evt_broadcast(event_name, data);
410 }
411
412 /*****************************************************/
413
414 static void record_session(struct afb_stub_ws *stubws, struct afb_session *session)
415 {
416         struct server_session *s, **prv;
417
418         /* search */
419         prv = &stubws->sessions;
420         while ((s = *prv)) {
421                 if (s->session == session)
422                         return;
423                 if (afb_session_is_closed(s->session)) {
424                         *prv = s->next;
425                         afb_session_unref(s->session);
426                         free(s);
427                 }
428                 else
429                         prv = &s->next;
430         }
431
432         /* create */
433         s = malloc(sizeof *s);
434         if (s) {
435                 s->session = afb_session_addref(session);
436                 s->next = stubws->sessions;
437                 stubws->sessions = s;
438         }
439 }
440
441 static void release_all_sessions(struct afb_stub_ws *stubws)
442 {
443         struct server_session *s, *n;
444
445         s = stubws->sessions;
446         stubws->sessions = NULL;
447         while(s) {
448                 n = s->next;
449                 afb_session_unref(s->session);
450                 free(s);
451                 s = n;
452         }
453 }
454
455 /*****************************************************/
456
457 static void on_call(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid, const char *user_creds)
458 {
459         struct afb_stub_ws *stubws = closure;
460         struct server_req *wreq;
461
462         afb_stub_ws_addref(stubws);
463
464         /* create the request */
465         wreq = malloc(sizeof *wreq);
466         if (wreq == NULL)
467                 goto out_of_memory;
468
469         afb_xreq_init(&wreq->xreq, &server_req_xreq_itf);
470         wreq->stubws = stubws;
471         wreq->call = call;
472
473         /* init the context */
474         if (afb_context_connect(&wreq->xreq.context, sessionid, NULL) < 0)
475                 goto unconnected;
476         wreq->xreq.context.validated = 1;
477         record_session(stubws, wreq->xreq.context.session);
478         if (wreq->xreq.context.created)
479                 afb_session_set_autoclose(wreq->xreq.context.session, 1);
480
481         /* makes the call */
482         wreq->xreq.cred = afb_cred_mixed_on_behalf_import(stubws->cred, sessionid, user_creds);
483         wreq->xreq.request.called_api = stubws->apiname;
484         wreq->xreq.request.called_verb = verb;
485         wreq->xreq.json = args;
486         afb_xreq_process(&wreq->xreq, stubws->apiset);
487         return;
488
489 unconnected:
490         free(wreq);
491 out_of_memory:
492         json_object_put(args);
493         afb_stub_ws_unref(stubws);
494         afb_proto_ws_call_reply(call, NULL, "internal-error", NULL);
495         afb_proto_ws_call_unref(call);
496 }
497
498 static void server_describe_sjob(int signum, void *closure)
499 {
500         struct json_object *obj;
501         struct server_describe *desc = closure;
502
503         /* get the description if possible */
504         obj = !signum ? afb_apiset_describe(desc->stubws->apiset, desc->stubws->apiname) : NULL;
505
506         /* send it */
507         afb_proto_ws_describe_put(desc->describe, obj);
508         json_object_put(obj);
509         afb_stub_ws_unref(desc->stubws);
510 }
511
512 static void server_describe_job(int signum, void *closure)
513 {
514         server_describe_sjob(signum, closure);
515         free(closure);
516 }
517
518 static void on_describe(void *closure, struct afb_proto_ws_describe *describe)
519 {
520         struct server_describe *desc, sdesc;
521         struct afb_stub_ws *stubws = closure;
522
523         /* allocate (if possible) and init */
524         desc = malloc(sizeof *desc);
525         if (desc == NULL)
526                 desc = &sdesc;
527         desc->stubws = stubws;
528         desc->describe = describe;
529         afb_stub_ws_addref(stubws);
530
531         /* process */
532         if (desc == &sdesc)
533                 jobs_call(NULL, 0, server_describe_sjob, desc);
534         else {
535                 if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
536                         jobs_call(NULL, 0, server_describe_job, desc);
537         }
538 }
539
540 /*****************************************************/
541
542 static const struct afb_proto_ws_client_itf client_itf =
543 {
544         .on_reply = on_reply,
545         .on_event_create = on_event_create,
546         .on_event_remove = on_event_remove,
547         .on_event_subscribe = on_event_subscribe,
548         .on_event_unsubscribe = on_event_unsubscribe,
549         .on_event_push = on_event_push,
550         .on_event_broadcast = on_event_broadcast,
551 };
552
553 static const struct afb_proto_ws_server_itf server_itf =
554 {
555         .on_call = on_call,
556         .on_describe = on_describe
557 };
558
559 static struct afb_api_itf ws_api_itf = {
560         .call = client_call_cb,
561         .describe = client_describe_cb
562 };
563
564 /* the interface for events pushing */
565 static const struct afb_evt_itf server_evt_itf = {
566         .broadcast = server_event_broadcast,
567         .push = server_event_push,
568         .add = server_event_add,
569         .remove = server_event_remove
570 };
571
572 /*****************************************************/
573
574 static void drop_all_events(struct afb_stub_ws *stubws)
575 {
576         struct client_event *ev, *nxt;
577
578         ev = stubws->events;
579         stubws->events = NULL;
580
581         while (ev) {
582                 nxt = ev->next;
583                 afb_evt_event_x2_unref(ev->event);
584                 free(ev);
585                 ev = nxt;
586         }
587 }
588
589 /* callback when receiving a hangup */
590 static void on_hangup(void *closure)
591 {
592         struct afb_stub_ws *stubws = closure;
593
594         afb_stub_ws_addref(stubws);
595         if (stubws->on_hangup)
596                 stubws->on_hangup(stubws);
597
598         release_all_sessions(stubws);
599         afb_stub_ws_unref(stubws);
600 }
601
602 /*****************************************************/
603
604 static struct afb_stub_ws *afb_stub_ws_create(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset, int client)
605 {
606         struct afb_stub_ws *stubws;
607
608         stubws = calloc(1, sizeof *stubws + strlen(apiname));
609         if (stubws == NULL)
610                 errno = ENOMEM;
611         else {
612                 if (client)
613                         stubws->proto = afb_proto_ws_create_client(fdev, &client_itf, stubws);
614                 else
615                         stubws->proto = afb_proto_ws_create_server(fdev, &server_itf, stubws);
616
617                 if (stubws->proto) {
618                         strcpy(stubws->apiname, apiname);
619                         stubws->apiset = afb_apiset_addref(apiset);
620                         stubws->refcount = 1;
621                         afb_proto_ws_on_hangup(stubws->proto, on_hangup);
622                         return stubws;
623                 }
624                 free(stubws);
625         }
626         fdev_unref(fdev);
627         return NULL;
628 }
629
630 struct afb_stub_ws *afb_stub_ws_create_client(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset)
631 {
632         return afb_stub_ws_create(fdev, apiname, apiset, 1);
633 }
634
635 struct afb_stub_ws *afb_stub_ws_create_server(struct fdev *fdev, const char *apiname, struct afb_apiset *apiset)
636 {
637         struct afb_stub_ws *stubws;
638
639         stubws = afb_stub_ws_create(fdev, apiname, apiset, 0);
640         if (stubws) {
641                 stubws->cred = afb_cred_create_for_socket(fdev_fd(fdev));
642                 stubws->listener = afb_evt_listener_create(&server_evt_itf, stubws);
643                 if (stubws->listener != NULL)
644                         return stubws;
645                 afb_stub_ws_unref(stubws);
646         }
647         return NULL;
648 }
649
650 void afb_stub_ws_unref(struct afb_stub_ws *stubws)
651 {
652         if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
653                 drop_all_events(stubws);
654                 if (stubws->listener)
655                         afb_evt_listener_unref(stubws->listener);
656                 release_all_sessions(stubws);
657                 afb_proto_ws_unref(stubws->proto);
658                 afb_cred_unref(stubws->cred);
659                 afb_apiset_unref(stubws->apiset);
660                 free(stubws);
661         }
662 }
663
664 void afb_stub_ws_addref(struct afb_stub_ws *stubws)
665 {
666         __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
667 }
668
669 void afb_stub_ws_on_hangup(struct afb_stub_ws *stubws, void (*on_hangup)(struct afb_stub_ws*))
670 {
671         stubws->on_hangup = on_hangup;
672 }
673
674 const char *afb_stub_ws_name(struct afb_stub_ws *stubws)
675 {
676         return stubws->apiname;
677 }
678
679 struct afb_api_item afb_stub_ws_client_api(struct afb_stub_ws *stubws)
680 {
681         struct afb_api_item api;
682
683         assert(!stubws->listener); /* check client */
684         api.closure = stubws;
685         api.itf = &ws_api_itf;
686         api.group = NULL;
687         return api;
688 }
689
690 int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset)
691 {
692         return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws));
693 }
694