/******************************************************************************/
-static inline void xreq_addref(struct afb_xreq *xreq)
+static void xreq_finalize(struct afb_xreq *xreq)
+{
+ if (!xreq->replied)
+ afb_xreq_fail(xreq, "error", "no reply");
+ if (xreq->hookflags)
+ afb_hook_xreq_end(xreq);
+ if (xreq->caller)
+ afb_xreq_unhooked_unref(xreq->caller);
+ xreq->queryitf->unref(xreq);
+}
+
+inline void afb_xreq_unhooked_addref(struct afb_xreq *xreq)
{
__atomic_add_fetch(&xreq->refcount, 1, __ATOMIC_RELAXED);
}
-static inline void xreq_unref(struct afb_xreq *xreq)
+inline void afb_xreq_unhooked_unref(struct afb_xreq *xreq)
{
- if (!__atomic_sub_fetch(&xreq->refcount, 1, __ATOMIC_RELAXED)) {
- if (!xreq->replied)
- afb_xreq_fail(xreq, "error", "no reply");
- if (xreq->hookflags)
- afb_hook_xreq_end(xreq);
- if (xreq->caller)
- xreq_unref(xreq->caller);
- xreq->queryitf->unref(xreq);
- }
+ if (!__atomic_sub_fetch(&xreq->refcount, 1, __ATOMIC_RELAXED))
+ xreq_finalize(xreq);
}
/******************************************************************************/
-extern const struct afb_req_itf xreq_itf;
-extern const struct afb_req_itf xreq_hooked_itf;
-
static inline struct afb_req to_req(struct afb_xreq *xreq)
{
- return (struct afb_req){ .itf = xreq->hookflags ? &xreq_hooked_itf : &xreq_itf, .closure = xreq };
+ return (struct afb_req){ .itf = xreq->itf, .closure = xreq };
}
/******************************************************************************/
subcall->completion(subcall, status, result);
json_object_put(result);
- afb_xreq_unref(&subcall->xreq);
+ afb_xreq_unhooked_unref(&subcall->xreq);
}
static void subcall_destroy_cb(struct afb_xreq *xreq)
subcall->xreq.api = api;
subcall->xreq.verb = verb;
subcall->xreq.caller = caller;
- xreq_addref(caller);
+ afb_xreq_unhooked_addref(caller);
}
return subcall;
}
subcall->xreq.caller, subcall->xreq.api, subcall->xreq.verb,
subcall->xreq.json, subcall_reply_direct_cb, &subcall->xreq);
} else {
- afb_xreq_addref(&subcall->xreq);
+ afb_xreq_unhooked_addref(&subcall->xreq);
afb_xreq_process(&subcall->xreq, subcall->xreq.caller->apiset);
}
}
{
int rc;
- afb_xreq_addref(&subcall->xreq);
+ afb_xreq_unhooked_addref(&subcall->xreq);
rc = jobs_enter(NULL, 0, subcall_sync_enter, subcall);
*result = subcall->result;
if (rc < 0 || subcall->status < 0) {
*result = *result ?: afb_msg_json_internal_error();
rc = -1;
}
- afb_xreq_unref(&subcall->xreq);
+ afb_xreq_unhooked_unref(&subcall->xreq);
return rc;
}
static void xreq_addref_cb(void *closure)
{
struct afb_xreq *xreq = closure;
- xreq_addref(xreq);
+ afb_xreq_unhooked_addref(xreq);
}
static void xreq_unref_cb(void *closure)
{
struct afb_xreq *xreq = closure;
- xreq_unref(xreq);
+ afb_xreq_unhooked_unref(xreq);
}
static void xreq_session_close_cb(void *closure)
void afb_xreq_success_f(struct afb_xreq *xreq, struct json_object *obj, const char *info, ...)
{
- char *message;
va_list args;
+
va_start(args, info);
- if (info == NULL || vasprintf(&message, info, args) < 0)
- message = NULL;
+ afb_req_success_v(to_req(xreq), obj, info, args);
va_end(args);
- afb_xreq_success(xreq, obj, message);
- free(message);
}
void afb_xreq_fail(struct afb_xreq *xreq, const char *status, const char *info)
void afb_xreq_fail_f(struct afb_xreq *xreq, const char *status, const char *info, ...)
{
- char *message;
va_list args;
+
va_start(args, info);
- if (info == NULL || vasprintf(&message, info, args) < 0)
- message = NULL;
+ afb_req_fail_v(to_req(xreq), status, info, args);
va_end(args);
- afb_xreq_fail(xreq, status, message);
- free(message);
+
}
const char *afb_xreq_raw(struct afb_xreq *xreq, size_t *size)
void afb_xreq_init(struct afb_xreq *xreq, const struct afb_xreq_query_itf *queryitf)
{
memset(xreq, 0, sizeof *xreq);
+ xreq->itf = &xreq_hooked_itf; /* hook by default */
xreq->refcount = 1;
xreq->queryitf = queryitf;
}
afb_xreq_fail_f(xreq, "unknown-verb", "verb %s unknown within api %s", xreq->verb, xreq->api);
}
-static void process_sync(struct afb_xreq *xreq)
+static void init_hooking(struct afb_xreq *xreq)
{
- const struct afb_api *api;
-
- /* init hooking */
afb_hook_init_xreq(xreq);
if (xreq->hookflags)
afb_hook_xreq_begin(xreq);
-
- /* search the api */
- api = (const struct afb_api*)xreq->context.api_key;
- if (api)
- api->itf->call(api->closure, xreq);
- else {
- api = afb_apiset_lookup_started(xreq->apiset, xreq->api, 1);
- if (errno == ENOENT)
- afb_xreq_fail_f(xreq, "unknown-api", "api %s not found", xreq->api);
- else
- afb_xreq_fail_f(xreq, "bad-api-state", "api %s not started correctly: %m", xreq->api);
- }
+ else
+ xreq->itf = &xreq_itf; /* unhook the interface */
}
+/**
+ * job callback for asynchronous and secured processing of the request.
+ */
static void process_async(int signum, void *arg)
{
struct afb_xreq *xreq = arg;
+ const struct afb_api *api;
if (signum != 0) {
+ /* emit the error (assumes that hooking is initialised) */
afb_xreq_fail_f(xreq, "aborted", "signal %s(%d) caught", strsignal(signum), signum);
} else {
- process_sync(xreq);
+ /* init hooking */
+ init_hooking(xreq);
+ /* invoke api call method to process the reqiest */
+ api = (const struct afb_api*)xreq->context.api_key;
+ api->itf->call(api->closure, xreq);
}
- xreq_unref(xreq);
+ /* release the request */
+ afb_xreq_unhooked_unref(xreq);
+}
+
+/**
+ * Early request failure of the request 'xreq' with, as usual, 'status' and 'info'
+ * The early failure occurs only in function 'afb_xreq_process' where normally,
+ * the hooking is not initialised. So this "early" failure takes care to initialise
+ * the hooking in first.
+ */
+static void early_failure(struct afb_xreq *xreq, const char *status, const char *info, ...)
+{
+ va_list args;
+
+ /* init hooking */
+ init_hooking(xreq);
+
+ /* send error */
+ va_start(args, info);
+ afb_req_fail_v(to_req(xreq), status, info, args);
+ va_end(args);
}
+/**
+ * Enqueue a job for processing the request 'xreq' using the given 'apiset'.
+ * Errors are reported as request failures.
+ */
void afb_xreq_process(struct afb_xreq *xreq, struct afb_apiset *apiset)
{
- const void *jobkey;
const struct afb_api *api;
struct afb_xreq *caller;
+ /* lookup at the api */
xreq->apiset = apiset;
api = afb_apiset_lookup_started(apiset, xreq->api, 1);
+ if (!api) {
+ if (errno == ENOENT)
+ early_failure(xreq, "unknown-api", "api %s not found (for verb %s)", xreq->api, xreq->verb);
+ else
+ early_failure(xreq, "bad-api-state", "api %s not started correctly: %m", xreq->api);
+ goto end;
+ }
xreq->context.api_key = api;
- if (!api || !api->noconcurrency)
- jobkey = NULL;
+ /* check self locking */
+ if (!api->noconcurrency)
+ api = NULL;
else {
caller = xreq->caller;
while (caller) {
if (caller->context.api_key == api) {
/* noconcurrency lock detected */
ERROR("self-lock detected in call stack for API %s", xreq->api);
- afb_xreq_fail_f(xreq, "cancelled", "recursive self lock, API %s", xreq->api);
+ early_failure(xreq, "self-locked", "recursive self lock, API %s", xreq->api);
goto end;
}
caller = caller->caller;
}
- jobkey = api;
}
- xreq_addref(xreq);
- if (jobs_queue(jobkey, afb_apiset_timeout_get(apiset), process_async, xreq) < 0) {
+
+ /* queue the request job */
+ afb_xreq_unhooked_addref(xreq);
+ if (jobs_queue(api, afb_apiset_timeout_get(apiset), process_async, xreq) < 0) {
/* TODO: allows or not to proccess it directly as when no threading? (see above) */
ERROR("can't process job with threads: %m");
- afb_xreq_fail_f(xreq, "cancelled", "not able to create a job for the task");
- xreq_unref(xreq);
+ early_failure(xreq, "cancelled", "not able to create a job for the task");
+ afb_xreq_unhooked_unref(xreq);
}
end:
- xreq_unref(xreq);
+ afb_xreq_unhooked_unref(xreq);
}