Improve interface xreq
[src/app-framework-binder.git] / src / afb-xreq.c
1 /*
2  * Copyright (C) 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19 #define NO_BINDING_VERBOSE_MACRO
20
21 #include <stdlib.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <json-c/json.h>
26 #include <afb/afb-binding.h>
27
28 #include "afb-context.h"
29 #include "afb-xreq.h"
30 #include "afb-evt.h"
31 #include "afb-msg-json.h"
32 #include "afb-subcall.h"
33 #include "jobs.h"
34 #include "verbose.h"
35
36
37 static struct json_object *xreq_json_cb(void *closure);
38 static struct afb_arg xreq_get_cb(void *closure, const char *name);
39
40 static void xreq_success_cb(void *closure, struct json_object *obj, const char *info);
41 static void xreq_fail_cb(void *closure, const char *status, const char *info);
42
43 static const char *xreq_raw_cb(void *closure, size_t *size);
44 static void xreq_send_cb(void *closure, const char *buffer, size_t size);
45
46 static void *xreq_context_get_cb(void *closure);
47 static void xreq_context_set_cb(void *closure, void *value, void (*free_value)(void*));
48
49 static void xreq_addref_cb(void *closure);
50 static void xreq_unref_cb(void *closure);
51
52 static void xreq_session_close_cb(void *closure);
53 static int xreq_session_set_LOA_cb(void *closure, unsigned level);
54
55 static int xreq_subscribe_cb(void *closure, struct afb_event event);
56 static int xreq_unsubscribe_cb(void *closure, struct afb_event event);
57
58 static void xreq_subcall_cb(
59                 void *closure,
60                 const char *api,
61                 const char *verb,
62                 struct json_object *args,
63                 void (*callback)(void*, int, struct json_object*),
64                 void *cb_closure);
65
66 const struct afb_req_itf xreq_itf = {
67         .json = xreq_json_cb,
68         .get = xreq_get_cb,
69         .success = xreq_success_cb,
70         .fail = xreq_fail_cb,
71         .raw = xreq_raw_cb,
72         .send = xreq_send_cb,
73         .context_get = xreq_context_get_cb,
74         .context_set = xreq_context_set_cb,
75         .addref = xreq_addref_cb,
76         .unref = xreq_unref_cb,
77         .session_close = xreq_session_close_cb,
78         .session_set_LOA = xreq_session_set_LOA_cb,
79         .subscribe = xreq_subscribe_cb,
80         .unsubscribe = xreq_unsubscribe_cb,
81         .subcall = xreq_subcall_cb
82 };
83
84 static struct json_object *xreq_json_cb(void *closure)
85 {
86         struct afb_xreq *xreq = closure;
87         return xreq->json ? : (xreq->json = xreq->queryitf->json(xreq->query));
88 }
89
90 static struct afb_arg xreq_get_cb(void *closure, const char *name)
91 {
92         struct afb_xreq *xreq = closure;
93         if (xreq->queryitf->get)
94                 return xreq->queryitf->get(xreq->query, name);
95         else
96                 return afb_msg_json_get_arg(xreq_json_cb(closure), name);
97 }
98
99 static void xreq_success_cb(void *closure, struct json_object *obj, const char *info)
100 {
101         struct afb_xreq *xreq = closure;
102         afb_xreq_success(xreq, obj, info);
103 }
104
105 void afb_xreq_success(struct afb_xreq *xreq, struct json_object *obj, const char *info)
106 {
107         if (xreq->replied) {
108                 ERROR("reply called more than one time!!");
109                 json_object_put(obj);
110         } else {
111                 xreq->replied = 1;
112                 if (xreq->queryitf->success)
113                         xreq->queryitf->success(xreq->query, obj, info);
114                 else
115                         xreq->queryitf->reply(xreq->query, 0, afb_msg_json_reply_ok(info, obj, &xreq->context, NULL));
116         }
117 }
118
119 static void xreq_fail_cb(void *closure, const char *status, const char *info)
120 {
121         struct afb_xreq *xreq = closure;
122         afb_xreq_fail(xreq, status, info);
123 }
124
125 void afb_xreq_fail(struct afb_xreq *xreq, const char *status, const char *info)
126 {
127         if (xreq->replied) {
128                 ERROR("reply called more than one time!!");
129         } else {
130                 xreq->replied = 1;
131                 if (xreq->queryitf->fail)
132                         xreq->queryitf->fail(xreq->query, status, info);
133                 else
134                         xreq->queryitf->reply(xreq->query, 1, afb_msg_json_reply_error(status, info, &xreq->context, NULL));
135         }
136 }
137
138 static const char *xreq_raw_cb(void *closure, size_t *size)
139 {
140         struct afb_xreq *xreq = closure;
141         return afb_xreq_raw(xreq, size);
142 }
143
144 const char *afb_xreq_raw(struct afb_xreq *xreq, size_t *size)
145 {
146         const char *result = json_object_to_json_string(xreq_json_cb(xreq));
147         if (size != NULL)
148                 *size = strlen(result);
149         return result;
150 }
151
152 static void xreq_send_cb(void *closure, const char *buffer, size_t size)
153 {
154         struct json_object *obj = json_tokener_parse(buffer);
155         if (!obj == !buffer)
156                 xreq_success_cb(closure, obj, "fake send");
157         else
158                 xreq_fail_cb(closure, "fake-send-failed", "fake send");
159 }
160
161 static void *xreq_context_get_cb(void *closure)
162 {
163         struct afb_xreq *xreq = closure;
164         return afb_context_get(&xreq->context);
165 }
166
167 static void xreq_context_set_cb(void *closure, void *value, void (*free_value)(void*))
168 {
169         struct afb_xreq *xreq = closure;
170         afb_context_set(&xreq->context, value, free_value);
171 }
172
173 static void xreq_addref_cb(void *closure)
174 {
175         struct afb_xreq *xreq = closure;
176         afb_xreq_addref(xreq);
177 }
178
179 void afb_xreq_addref(struct afb_xreq *xreq)
180 {
181         xreq->refcount++;
182 }
183
184 static void xreq_unref_cb(void *closure)
185 {
186         struct afb_xreq *xreq = closure;
187         afb_xreq_unref(xreq);
188 }
189
190 void afb_xreq_unref(struct afb_xreq *xreq)
191 {
192         if (!--xreq->refcount) {
193                 xreq->queryitf->unref(xreq->query);
194         }
195 }
196
197 static void xreq_session_close_cb(void *closure)
198 {
199         struct afb_xreq *xreq = closure;
200         afb_context_close(&xreq->context);
201 }
202
203 static int xreq_session_set_LOA_cb(void *closure, unsigned level)
204 {
205         struct afb_xreq *xreq = closure;
206         return afb_context_change_loa(&xreq->context, level);
207 }
208
209 static int xreq_subscribe_cb(void *closure, struct afb_event event)
210 {
211         struct afb_xreq *xreq = closure;
212         return afb_xreq_subscribe(xreq, event);
213 }
214
215 int afb_xreq_subscribe(struct afb_xreq *xreq, struct afb_event event)
216 {
217         if (xreq->listener)
218                 return afb_evt_add_watch(xreq->listener, event);
219         if (xreq->queryitf->subscribe)
220                 return xreq->queryitf->subscribe(xreq->query, event);
221         ERROR("no event listener, subscription impossible");
222         errno = EINVAL;
223         return -1;
224 }
225
226 static int xreq_unsubscribe_cb(void *closure, struct afb_event event)
227 {
228         struct afb_xreq *xreq = closure;
229         return afb_xreq_unsubscribe(xreq, event);
230 }
231
232 int afb_xreq_unsubscribe(struct afb_xreq *xreq, struct afb_event event)
233 {
234         if (xreq->listener)
235                 return afb_evt_remove_watch(xreq->listener, event);
236         if (xreq->queryitf->unsubscribe)
237                 return xreq->queryitf->unsubscribe(xreq->query, event);
238         ERROR("no event listener, unsubscription impossible");
239         errno = EINVAL;
240         return -1;
241 }
242
243 static void xreq_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
244 {
245         struct afb_xreq *xreq = closure;
246         if (xreq->queryitf->subcall)
247                 xreq->queryitf->subcall(xreq->query, api, verb, args, callback, cb_closure);
248         else
249                 afb_subcall(&xreq->context, api, verb, args, callback, cb_closure, (struct afb_req){ .itf = &xreq_itf, .closure = xreq });
250 }
251
252 void afb_xreq_success_f(struct afb_xreq *xreq, struct json_object *obj, const char *info, ...)
253 {
254         char *message;
255         va_list args;
256         va_start(args, info);
257         if (info == NULL || vasprintf(&message, info, args) < 0)
258                 message = NULL;
259         va_end(args);
260         afb_xreq_success(xreq, obj, message);
261         free(message);
262 }
263
264 void afb_xreq_fail_f(struct afb_xreq *xreq, const char *status, const char *info, ...)
265 {
266         char *message;
267         va_list args;
268         va_start(args, info);
269         if (info == NULL || vasprintf(&message, info, args) < 0)
270                 message = NULL;
271         va_end(args);
272         afb_xreq_fail(xreq, status, message);
273         free(message);
274 }
275
276 static int xcheck(struct afb_xreq *xreq)
277 {
278         int stag = xreq->sessionflags;
279
280         if ((stag & (AFB_SESSION_CREATE|AFB_SESSION_CLOSE|AFB_SESSION_RENEW|AFB_SESSION_CHECK|AFB_SESSION_LOA_EQ)) != 0) {
281                 if (!afb_context_check(&xreq->context)) {
282                         afb_context_close(&xreq->context);
283                         afb_xreq_fail_f(xreq, "failed", "invalid token's identity");
284                         return 0;
285                 }
286         }
287
288         if ((stag & AFB_SESSION_CREATE) != 0) {
289                 if (afb_context_check_loa(&xreq->context, 1)) {
290                         afb_xreq_fail_f(xreq, "failed", "invalid creation state");
291                         return 0;
292                 }
293                 afb_context_change_loa(&xreq->context, 1);
294                 afb_context_refresh(&xreq->context);
295         }
296
297         if ((stag & (AFB_SESSION_CREATE | AFB_SESSION_RENEW)) != 0)
298                 afb_context_refresh(&xreq->context);
299
300         if ((stag & AFB_SESSION_CLOSE) != 0) {
301                 afb_context_change_loa(&xreq->context, 0);
302                 afb_context_close(&xreq->context);
303         }
304
305         if ((stag & AFB_SESSION_LOA_GE) != 0) {
306                 int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
307                 if (!afb_context_check_loa(&xreq->context, loa)) {
308                         afb_xreq_fail_f(xreq, "failed", "invalid LOA");
309                         return 0;
310                 }
311         }
312
313         if ((stag & AFB_SESSION_LOA_LE) != 0) {
314                 int loa = (stag >> AFB_SESSION_LOA_SHIFT) & AFB_SESSION_LOA_MASK;
315                 if (afb_context_check_loa(&xreq->context, loa + 1)) {
316                         afb_xreq_fail_f(xreq, "failed", "invalid LOA");
317                         return 0;
318                 }
319         }
320         return 1;
321 }
322
323 static void xreq_run_cb(int signum, void *arg)
324 {
325         struct afb_xreq *xreq = arg;
326
327         if (signum == 0)
328                 xreq->callback((struct afb_req){ .itf = &xreq_itf, .closure = xreq });
329         else {
330                 afb_xreq_fail_f(xreq, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
331                 
332         }
333         afb_xreq_unref(xreq);
334 }
335
336 void afb_xreq_call(struct afb_xreq *xreq)
337 {
338         int rc;
339         if (xcheck(xreq)) {
340                 afb_xreq_addref(xreq);
341                 rc = jobs_queue(xreq->group, xreq->timeout, xreq_run_cb, xreq);
342                 if (rc < 0) {
343                         /* TODO: allows or not to proccess it directly as when no threading? (see above) */
344                         ERROR("can't process job with threads: %m");
345                         afb_xreq_fail_f(xreq, "cancelled", "not able to pipe a job for the task");
346                         xreq_unref_cb(xreq);
347                 }
348         }
349 }
350