From 7e0abe76db7b90369429bf387d7aad0fb5a42328 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Thu, 9 Jun 2016 07:54:31 +0200 Subject: [PATCH] Events: refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This new version allows to subscribe a client for an event. The event should first be created for the API (the API's prefix is added) using 'afb_daemon_make_event'. After that, plugins can subscribe or unsubscribe their clients (identified through requests) to the events that it generates. See 'afb_req_subscribe' and 'afb_req_unsubscribe'. Events created by 'afb_daemon_make_event' can be widely broadcasted using 'afb_event_broadcast' or pushed only to suscribers using 'afb_event_push'. Events can be destroyed using 'afb_event_drop'. Change-Id: I7c0bed5e625c2052dcd81c6bfe960def1fa032f3 Signed-off-by: José Bollo --- include/afb/afb-event-itf.h | 84 ++++++++++++++ include/afb/afb-plugin.h | 17 ++- include/afb/afb-req-itf.h | 28 +++++ src/CMakeLists.txt | 1 + src/afb-api-dbus.c | 23 +--- src/afb-api-dbus.h | 4 + src/afb-api-so.c | 26 ++++- src/afb-context.c | 1 + src/afb-evt.c | 261 ++++++++++++++++++++++++++++++++++++++++++++ src/afb-evt.h | 36 ++++++ src/afb-hreq.c | 15 ++- src/afb-hreq.h | 3 + src/afb-ws-json1.c | 38 ++++--- src/afb-ws-json1.h | 3 + src/session.c | 108 ------------------ src/session.h | 16 --- 16 files changed, 500 insertions(+), 164 deletions(-) create mode 100644 include/afb/afb-event-itf.h create mode 100644 src/afb-evt.c create mode 100644 src/afb-evt.h diff --git a/include/afb/afb-event-itf.h b/include/afb/afb-event-itf.h new file mode 100644 index 00000000..47ffa387 --- /dev/null +++ b/include/afb/afb-event-itf.h @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2016 "IoT.bzh" + * Author: José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +/* avoid inclusion of */ +struct json_object; + +/* + * Interface for handling requests. + * It records the functions to be called for the request. + * Don't use this structure directly. + * Use the helper functions documented below. + */ +struct afb_event_itf { + /* CAUTION: respect the order, add at the end */ + + int (*broadcast)(void *closure, struct json_object *obj); + int (*push)(void *closure, struct json_object *obj); + void (*drop)(void *closure); +}; + +/* + * Describes the request by plugins from afb-daemon + */ +struct afb_event { + const struct afb_event_itf *itf; /* the interface to use */ + void *closure; /* the closure argument for functions of 'itf' */ +}; + +/* + * Broadcasts widely the 'event' with the data 'object'. + * 'object' can be NULL. + * + * For conveniency, the function calls 'json_object_put' for 'object'. + * Thus, in the case where 'object' should remain available after + * the function returns, the function 'json_object_get' shall be used. + * + * Returns the count of clients that received the event. + */ +static inline int afb_event_broadcast(struct afb_event event, struct json_object *object) +{ + return event.itf->broadcast(event.closure, object); +} + +/* + * Pushes the 'event' with the data 'object' to its obeservers. + * 'object' can be NULL. + * + * For conveniency, the function calls 'json_object_put' for 'object'. + * Thus, in the case where 'object' should remain available after + * the function returns, the function 'json_object_get' shall be used. + * + * Returns the count of clients that received the event. + */ +static inline int afb_event_push(struct afb_event event, struct json_object *object) +{ + return event.itf->push(event.closure, object); +} + +/* + * Drops the data associated to the event + * After calling this function, the event + * MUST NOT BE USED ANYMORE. + */ +static inline void afb_event_drop(struct afb_event event) +{ + event.itf->drop(event.closure); +} + diff --git a/include/afb/afb-plugin.h b/include/afb/afb-plugin.h index 1eb3475b..85e8de7e 100644 --- a/include/afb/afb-plugin.h +++ b/include/afb/afb-plugin.h @@ -38,6 +38,7 @@ * Some function of the library are exported to afb-daemon. */ +#include #include /* @@ -140,11 +141,12 @@ struct sd_bus; * Definition of the facilities provided by the daemon. */ struct afb_daemon_itf { - void (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */ + int (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */ struct sd_event *(*get_event_loop)(void *closure); /* gets the common systemd's event loop */ struct sd_bus *(*get_user_bus)(void *closure); /* gets the common systemd's user d-bus */ struct sd_bus *(*get_system_bus)(void *closure); /* gets the common systemd's system d-bus */ void (*vverbose)(void*closure, int level, const char *file, int line, const char *fmt, va_list args); + struct afb_event (*event_make)(void *closure, const char *name); /* creates an event of 'name' */ }; /* @@ -206,12 +208,23 @@ static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon) * For conveniency, the function calls 'json_object_put' for 'object'. * Thus, in the case where 'object' should remain available after * the function returns, the function 'json_object_get' shall be used. + * + * Returns the count of clients that received the event. */ -static inline void afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object) +static inline int afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object) { return daemon.itf->event_broadcast(daemon.closure, name, object); } +/* + * Creates an event of 'name' and returns it. + * 'daemon' MUST be the daemon given in interface when activating the plugin. + */ +static inline struct afb_event afb_daemon_make_event(struct afb_daemon daemon, const char *name) +{ + return daemon.itf->event_make(daemon.closure, name); +} + /* * Send a message described by 'fmt' and following parameters * to the journal for the verbosity 'level'. diff --git a/include/afb/afb-req-itf.h b/include/afb/afb-req-itf.h index f4fab551..2b3bc467 100644 --- a/include/afb/afb-req-itf.h +++ b/include/afb/afb-req-itf.h @@ -25,6 +25,8 @@ #include #include +#include + /* avoid inclusion of */ struct json_object; @@ -65,6 +67,9 @@ struct afb_req_itf { void (*session_close)(void *closure); int (*session_set_LOA)(void *closure, unsigned level); + + int (*subscribe)(void *closure, struct afb_event event); + int (*unsubscribe)(void *closure, struct afb_event event); }; /* @@ -315,6 +320,29 @@ static inline struct afb_req afb_req_unstore(struct afb_req *req) return result; } +/* + * Establishes for the client link identified by 'req' a subscription + * to the 'event'. + * Returns 0 in case of successful subscription or -1 in case of error. + */ +static inline int afb_req_subscribe(struct afb_req req, struct afb_event event) +{ + return req.itf->subscribe(req.closure, event); +} + +/* + * Revokes the subscription established to the 'event' for the client + * link identified by 'req'. + * Returns 0 in case of successful subscription or -1 in case of error. + */ +static inline int afb_req_unsubscribe(struct afb_req req, struct afb_event event) +{ + return req.itf->unsubscribe(req.closure, event); +} + + + + /* internal use */ static inline const char *afb_req_raw(struct afb_req req, size_t *size) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca20665b..cbaf286f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -35,6 +35,7 @@ ADD_LIBRARY(afb-lib STATIC afb-apis.c afb-common.c afb-context.c + afb-evt.c afb-hreq.c afb-hsrv.c afb-hswitch.c diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c index c0bc57a8..eda7985f 100644 --- a/src/afb-api-dbus.c +++ b/src/afb-api-dbus.c @@ -35,6 +35,7 @@ #include "afb-apis.h" #include "afb-api-so.h" #include "afb-context.h" +#include "afb-evt.h" #include "verbose.h" static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/"; @@ -52,6 +53,7 @@ struct api_dbus char *path; /* path of the object for the API */ char *name; /* name/interface of the object */ char *api; /* api name of the interface */ + struct afb_evt_listener *listener; }; #define RETOK 1 @@ -302,7 +304,7 @@ static int api_dbus_client_on_event(sd_bus_message *m, void *userdata, sd_bus_er ERROR("unreadable event"); else { object = json_tokener_parse(data); - ctxClientEventSend(NULL, event, object); + afb_evt_broadcast(event, object); json_object_put(object); } return 1; @@ -444,7 +446,7 @@ static void dbus_req_send(struct dbus_req *dreq, const char *buffer, size_t size dbus_req_reply(dreq, RETRAW, buffer, ""); } -struct afb_req_itf dbus_req_itf = { +const struct afb_req_itf afb_api_dbus_req_itf = { .json = (void*)dbus_req_json, .get = (void*)dbus_req_get, .success = (void*)dbus_req_success, @@ -504,7 +506,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd dreq->message = sd_bus_message_ref(message); dreq->json = NULL; dreq->refcount = 1; - areq.itf = &dbus_req_itf; + areq.itf = &afb_api_dbus_req_itf; areq.closure = dreq; afb_apis_call_(areq, &dreq->context, api->api, method); dbus_req_unref(dreq); @@ -522,19 +524,6 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve json_object_put(object); } -static int afb_api_dbus_server_expects_event(struct api_dbus *api, const char *event) -{ - size_t len = strlen(api->api); - if (strncasecmp(event, api->api, len) != 0) - return 0; - return event[len] == '.'; -} - -static struct afb_event_listener_itf evitf = { - .send = (void*)afb_api_dbus_server_send_event, - .expects = (void*)afb_api_dbus_server_expects_event -}; - /* create the service */ int afb_api_dbus_add_server(const char *path) { @@ -563,7 +552,7 @@ int afb_api_dbus_add_server(const char *path) } INFO("afb service over dbus installed, name %s, path %s", api->name, api->path); - ctxClientEventListenerAdd(NULL, (struct afb_event_listener){ .itf = &evitf, .closure = api }); + api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api); return 0; error3: diff --git a/src/afb-api-dbus.h b/src/afb-api-dbus.h index c8a7bc3f..10f5f7ff 100644 --- a/src/afb-api-dbus.h +++ b/src/afb-api-dbus.h @@ -18,6 +18,10 @@ #pragma once +struct afb_req_itf; + +extern const struct afb_req_itf afb_api_dbus_req_itf; + extern int afb_api_dbus_add_client(const char *path); extern int afb_api_dbus_add_server(const char *path); diff --git a/src/afb-api-so.c b/src/afb-api-so.c index b741b13a..84f69753 100644 --- a/src/afb-api-so.c +++ b/src/afb-api-so.c @@ -30,6 +30,7 @@ #include #include +#include #include "session.h" #include "afb-common.h" @@ -37,6 +38,7 @@ #include "afb-apis.h" #include "afb-api-so.h" #include "afb-sig-handler.h" +#include "afb-evt.h" #include "verbose.h" /* @@ -58,18 +60,37 @@ void afb_api_so_set_timeout(int to) api_timeout = to; } +static struct afb_event afb_api_so_event_make(struct api_so_desc *desc, const char *name) +{ + size_t length; + char *event; + + /* makes the event name */ + assert(desc->plugin != NULL); + length = strlen(name); + event = alloca(length + 2 + desc->apilength); + memcpy(event, desc->plugin->v1.prefix, desc->apilength); + event[desc->apilength] = '/'; + memcpy(event + desc->apilength + 1, name, length + 1); + + /* crate the event */ + return afb_evt_create_event(event); +} + static int afb_api_so_event_broadcast(struct api_so_desc *desc, const char *name, struct json_object *object) { size_t length; char *event; + /* makes the event name */ assert(desc->plugin != NULL); length = strlen(name); event = alloca(length + 2 + desc->apilength); memcpy(event, desc->plugin->v1.prefix, desc->apilength); event[desc->apilength] = '/'; memcpy(event + desc->apilength + 1, name, length + 1); - return ctxClientEventSend(NULL, event, object); + + return afb_evt_broadcast(event, object); } static void afb_api_so_vverbose(struct api_so_desc *desc, int level, const char *file, int line, const char *fmt, va_list args) @@ -89,7 +110,8 @@ static const struct afb_daemon_itf daemon_itf = { .get_event_loop = (void*)afb_common_get_event_loop, .get_user_bus = (void*)afb_common_get_user_bus, .get_system_bus = (void*)afb_common_get_system_bus, - .vverbose = (void*)afb_api_so_vverbose + .vverbose = (void*)afb_api_so_vverbose, + .event_make = (void*)afb_api_so_event_make }; struct monitoring { diff --git a/src/afb-context.c b/src/afb-context.c index ba093c37..5fe32764 100644 --- a/src/afb-context.c +++ b/src/afb-context.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2015, 2016 "IoT.bzh" * Author "Fulup Ar Foll" + * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/afb-evt.c b/src/afb-evt.c new file mode 100644 index 00000000..00261b6c --- /dev/null +++ b/src/afb-evt.c @@ -0,0 +1,261 @@ +/* + * Copyright (C) 2015, 2016 "IoT.bzh" + * Author "Fulup Ar Foll" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include + +#include +#include + +#include "afb-evt.h" + + +struct afb_evt_watch; + +struct afb_evt_listener { + struct afb_evt_listener *next; + void (*send)(void *closure, const char *event, struct json_object *object); + void *closure; + struct afb_evt_watch *watchs; + int refcount; +}; + +struct afb_evt_event { + struct afb_evt_watch *watchs; + char name[1]; +}; + +struct afb_evt_watch { + struct afb_evt_event *event; + struct afb_evt_watch *next_by_event; + struct afb_evt_listener *listener; + struct afb_evt_watch *next_by_listener; +}; + +static int evt_broadcast(struct afb_evt_event *evt, struct json_object *obj); +static int evt_push(struct afb_evt_event *evt, struct json_object *obj); +static void evt_drop(struct afb_evt_event *evt); + +static struct afb_event_itf afb_evt_event_itf = { + .broadcast = (void*)evt_broadcast, + .push = (void*)evt_push, + .drop = (void*)evt_drop +}; + +static struct afb_evt_listener *listeners = NULL; + +static inline int evt_trash(struct json_object *obj) +{ + return 0; +} + +static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object) +{ + return afb_evt_broadcast(evt->name, object); +} + +int afb_evt_broadcast(const char *event, struct json_object *object) +{ + int result; + struct afb_evt_listener *listener; + + result = 0; + listener = listeners; + while(listener) { + listener->send(listener->closure, event, json_object_get(object)); + listener = listener->next; + } + json_object_put(object); + return result; +} + +static int evt_push(struct afb_evt_event *evt, struct json_object *obj) +{ + int result; + struct afb_evt_watch *watch; + struct afb_evt_listener *listener; + + result = 0; + watch = evt->watchs; + while(listener) { + listener = watch->listener; + listener->send(listener->closure, evt->name, json_object_get(obj)); + watch = watch->next_by_event; + } + json_object_put(obj); + return result; +} + +static void remove_watch(struct afb_evt_watch *watch) +{ + struct afb_evt_watch **prv; + + prv = &watch->event->watchs; + while(*prv != watch) + prv = &(*prv)->next_by_event; + *prv = watch->next_by_event; + + prv = &watch->listener->watchs; + while(*prv != watch) + prv = &(*prv)->next_by_listener; + *prv = watch->next_by_listener; + + free(watch); +} + +static void evt_drop(struct afb_evt_event *evt) +{ + if (evt != NULL) { + while(evt->watchs != NULL) + remove_watch(evt->watchs); + free(evt); + } +} + +struct afb_event afb_evt_create_event(const char *name) +{ + size_t len; + struct afb_evt_event *evt; + + len = strlen(name); + evt = malloc(len + sizeof * evt); + if (evt != NULL) { + evt->watchs = NULL; + memcpy(evt->name, name, len + 1); + } + return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt }; +} + +struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure) +{ + struct afb_evt_listener *listener; + + /* search if an instance already exists */ + listener = listeners; + while (listener != NULL) { + if (listener->send == send && listener->closure == closure) + return afb_evt_listener_addref(listener); + listener = listener->next; + } + + /* allocates */ + listener = calloc(1, sizeof *listener); + if (listener != NULL) { + /* init */ + listener->next = listeners; + listener->send = send; + listener->closure = closure; + listener->watchs = NULL; + listener->refcount = 1; + listeners = listener; + } + return listener; +} + +struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener) +{ + listener->refcount++; + return listener; +} + +void afb_evt_listener_unref(struct afb_evt_listener *listener) +{ + if (0 == --listener->refcount) { + struct afb_evt_listener **prv; + + /* remove the watchers */ + while (listener->watchs != NULL) + remove_watch(listener->watchs); + + /* unlink the listener */ + prv = &listeners; + while (*prv != listener) + prv = &(*prv)->next; + *prv = listener->next; + + /* free the listener */ + free(listener); + } +} + +int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) +{ + struct afb_evt_watch *watch; + struct afb_evt_event *evt; + + /* check parameter */ + if (event.itf != &afb_evt_event_itf) { + errno = EINVAL; + return -1; + } + + /* search the existing watch */ + watch = listener->watchs; + while(watch != NULL) { + if (watch->event == event.closure) + return 0; + watch = watch->next_by_listener; + } + + /* not found, allocate a new */ + watch = malloc(sizeof *watch); + if (watch == NULL) { + errno = ENOMEM; + return -1; + } + + /* initialise and link */ + evt = event.closure; + watch->event = evt; + watch->next_by_event = evt->watchs; + watch->listener = listener; + watch->next_by_listener = listener->watchs; + evt->watchs = watch; + listener->watchs = watch; + + return 0; +} + +int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event) +{ + struct afb_evt_watch *watch; + + /* check parameter */ + if (event.itf != &afb_evt_event_itf) { + errno = EINVAL; + return -1; + } + + /* search the existing watch */ + watch = listener->watchs; + while(watch != NULL) { + if (watch->event == event.closure) { + /* found: remove it */ + remove_watch(watch); + break; + } + watch = watch->next_by_listener; + } + return 0; +} + + diff --git a/src/afb-evt.h b/src/afb-evt.h new file mode 100644 index 00000000..8e102546 --- /dev/null +++ b/src/afb-evt.h @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2016 "IoT.bzh" + * Author: José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +struct afb_event; +struct AFB_clientCtx; + +struct afb_evt_listener; + +extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure); + +extern int afb_evt_broadcast(const char *event, struct json_object *object); + +extern struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener); +extern void afb_evt_listener_unref(struct afb_evt_listener *listener); + +extern struct afb_event afb_evt_create_event(const char *name); + +extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event); +extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event); + diff --git a/src/afb-hreq.c b/src/afb-hreq.c index 4ca8441e..1e38b412 100644 --- a/src/afb-hreq.c +++ b/src/afb-hreq.c @@ -74,8 +74,9 @@ static void req_fail(struct afb_hreq *hreq, const char *status, const char *info static void req_success(struct afb_hreq *hreq, json_object *obj, const char *info); static const char *req_raw(struct afb_hreq *hreq, size_t *size); static void req_send(struct afb_hreq *hreq, const char *buffer, size_t size); +static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event); -static const struct afb_req_itf afb_hreq_itf = { +const struct afb_req_itf afb_hreq_req_itf = { .json = (void*)req_json, .get = (void*)req_get, .success = (void*)req_success, @@ -87,7 +88,9 @@ static const struct afb_req_itf afb_hreq_itf = { .addref = (void*)afb_hreq_addref, .unref = (void*)afb_hreq_unref, .session_close = (void*)afb_context_close, - .session_set_LOA = (void*)afb_context_change_loa + .session_set_LOA = (void*)afb_context_change_loa, + .subscribe = (void*)req_subscribe_unsubscribe_error, + .unsubscribe = (void*)req_subscribe_unsubscribe_error }; static struct hreq_data *get_data(struct afb_hreq *hreq, const char *key, int create) @@ -697,7 +700,7 @@ int afb_hreq_post_add_file(struct afb_hreq *hreq, const char *key, const char *f struct afb_req afb_hreq_to_req(struct afb_hreq *hreq) { - return (struct afb_req){ .itf = &afb_hreq_itf, .closure = hreq }; + return (struct afb_req){ .itf = &afb_hreq_req_itf, .closure = hreq }; } static struct afb_arg req_get(struct afb_hreq *hreq, const char *name) @@ -798,6 +801,12 @@ static void req_success(struct afb_hreq *hreq, json_object *obj, const char *inf req_reply(hreq, MHD_HTTP_OK, "success", info, obj); } +static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event) +{ + errno = EINVAL; + return -1; +} + int afb_hreq_init_context(struct afb_hreq *hreq) { const char *uuid; diff --git a/src/afb-hreq.h b/src/afb-hreq.h index 836f4703..772cd677 100644 --- a/src/afb-hreq.h +++ b/src/afb-hreq.h @@ -21,6 +21,9 @@ struct AFB_clientCtx; struct json_object; struct hreq_data; struct afb_hsrv; +struct afb_req_itf; + +extern const struct afb_req_itf afb_hreq_req_itf; struct afb_hreq { /* diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c index ffb6b81f..5ef751f1 100644 --- a/src/afb-ws-json1.c +++ b/src/afb-ws-json1.c @@ -32,6 +32,7 @@ #include #include "afb-apis.h" #include "afb-context.h" +#include "afb-evt.h" #include "verbose.h" static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1); @@ -50,22 +51,13 @@ struct afb_ws_json1 void (*cleanup)(void*); void *cleanup_closure; struct AFB_clientCtx *session; + struct afb_evt_listener *listener; struct afb_wsj1 *wsj1; int new_session; }; static void aws_send_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object); -static const struct afb_event_listener_itf event_listener_itf = { - .send = (void*)aws_send_event, - .expects = NULL -}; - -static inline struct afb_event_listener listener_for(struct afb_ws_json1 *aws) -{ - return (struct afb_event_listener){ .itf = &event_listener_itf, .closure = aws }; -} - struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure) { struct afb_ws_json1 *result; @@ -89,7 +81,8 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo if (result->wsj1 == NULL) goto error3; - if (0 > ctxClientEventListenerAdd(result->session, listener_for(result))) + result->listener = afb_evt_listener_create((void*)aws_send_event, result); + if (result->listener == NULL) goto error4; return result; @@ -114,7 +107,7 @@ static struct afb_ws_json1 *aws_addref(struct afb_ws_json1 *ws) static void aws_unref(struct afb_ws_json1 *ws) { if (--ws->refcount == 0) { - ctxClientEventListenerRemove(ws->session, listener_for(ws)); + afb_evt_listener_unref(ws->listener); afb_wsj1_unref(ws->wsj1); if (ws->cleanup != NULL) ws->cleanup(ws->cleanup_closure); @@ -149,9 +142,10 @@ static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char * static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info); static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size); static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size); +static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event); +static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event); - -static const struct afb_req_itf wsreq_itf = { +const struct afb_req_itf afb_ws_json1_req_itf = { .json = (void*)wsreq_json, .get = (void*)wsreq_get, .success = (void*)wsreq_success, @@ -163,7 +157,9 @@ static const struct afb_req_itf wsreq_itf = { .addref = (void*)wsreq_addref, .unref = (void*)wsreq_unref, .session_close = (void*)afb_context_close, - .session_set_LOA = (void*)afb_context_change_loa + .session_set_LOA = (void*)afb_context_change_loa, + .subscribe = (void*)wsreq_subscribe, + .unsubscribe = (void*)wsreq_unsubscribe }; static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg) @@ -197,7 +193,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve /* emits the call */ r.closure = wsreq; - r.itf = &wsreq_itf; + r.itf = &afb_ws_json1_req_itf; afb_apis_call_(r, &wsreq->context, api, verb); wsreq_unref(wsreq); } @@ -276,3 +272,13 @@ static void aws_send_event(struct afb_ws_json1 *aws, const char *event, struct j afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object)); } +static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event) +{ + return afb_evt_add_watch(wsreq->aws->listener, event); +} + +static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event) +{ + return afb_evt_remove_watch(wsreq->aws->listener, event); +} + diff --git a/src/afb-ws-json1.h b/src/afb-ws-json1.h index fedbcf6c..f714c222 100644 --- a/src/afb-ws-json1.h +++ b/src/afb-ws-json1.h @@ -19,6 +19,9 @@ struct afb_ws_json1; struct afb_context; +struct afb_req_itf; + +extern const struct afb_req_itf afb_ws_json1_req_itf; extern struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *closure); diff --git a/src/session.c b/src/session.c index 16dc8369..e0d0a8e6 100644 --- a/src/session.c +++ b/src/session.c @@ -38,13 +38,6 @@ struct client_value void (*free_value)(void*); }; -struct afb_event_listener_list -{ - struct afb_event_listener_list *next; - struct afb_event_listener listener; - int refcount; -}; - struct AFB_clientCtx { unsigned refcount; @@ -54,7 +47,6 @@ struct AFB_clientCtx char uuid[37]; // long term authentication of remote client char token[37]; // short term authentication of remote client struct client_value *values; - struct afb_event_listener_list *listeners; }; // Session UUID are store in a simple array [for 10 sessions this should be enough] @@ -66,7 +58,6 @@ static struct { int timeout; int apicount; char initok[37]; - struct afb_event_listener_list *listeners; } sessions; /* generate a uuid */ @@ -289,8 +280,6 @@ void ctxClientClose (struct AFB_clientCtx *clientCtx) if (clientCtx->uuid[0] != 0) { clientCtx->uuid[0] = 0; ctxUuidFreeCB (clientCtx); - while(clientCtx->listeners != NULL) - ctxClientEventListenerRemove(clientCtx, clientCtx->listeners->listener); if (clientCtx->refcount == 0) { ctxStoreDel (clientCtx); free(clientCtx); @@ -326,103 +315,6 @@ void ctxTokenNew (struct AFB_clientCtx *clientCtx) clientCtx->expiration = NOW + sessions.timeout; } -static int add_listener(struct afb_event_listener_list **head, struct afb_event_listener listener) -{ - struct afb_event_listener_list *iter, **prv; - - prv = head; - for (;;) { - iter = *prv; - if (iter == NULL) { - iter = calloc(1, sizeof *iter); - if (iter == NULL) { - errno = ENOMEM; - return -1; - } - iter->listener = listener; - iter->refcount = 1; - *prv = iter; - return 0; - } - if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) { - iter->refcount++; - return 0; - } - prv = &iter->next; - } -} - -int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener) -{ - return add_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener); -} - -static void remove_listener(struct afb_event_listener_list **head, struct afb_event_listener listener) -{ - struct afb_event_listener_list *iter, **prv; - - prv = head; - for (;;) { - iter = *prv; - if (iter == NULL) - return; - if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) { - if (!--iter->refcount) { - *prv = iter->next; - free(iter); - } - return; - } - prv = &iter->next; - } -} - -void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener) -{ - remove_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener); -} - -static int send(struct afb_event_listener_list *head, const char *event, struct json_object *object) -{ - struct afb_event_listener_list *iter; - int result; - - result = 0; - iter = head; - while (iter != NULL) { - if (iter->listener.itf->expects == NULL || iter->listener.itf->expects(iter->listener.closure, event)) { - iter->listener.itf->send(iter->listener.closure, event, json_object_get(object)); - result++; - } - iter = iter->next; - } - - return result; -} - -int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object) -{ - long idx; - time_t now; - int result; - - now = NOW; - if (clientCtx != NULL) { - result = ctxIsActive(clientCtx, now) ? send(clientCtx->listeners, event, object) : 0; - } else { - result = send(sessions.listeners, event, object); - for (idx=0; idx < sessions.max; idx++) { - clientCtx = ctxClientAddRef(sessions.store[idx]); - if (clientCtx != NULL && ctxIsActive(clientCtx, now)) { - clientCtx = ctxClientAddRef(clientCtx); - result += send(clientCtx->listeners, event, object); - } - ctxClientUnref(clientCtx); - } - } - return result; -} - const char *ctxClientGetUuid (struct AFB_clientCtx *clientCtx) { assert(clientCtx != NULL); diff --git a/src/session.h b/src/session.h index af074100..497951af 100644 --- a/src/session.h +++ b/src/session.h @@ -20,18 +20,6 @@ struct json_object; struct AFB_clientCtx; -struct afb_event_listener_itf -{ - void (*send)(void *closure, const char *event, struct json_object *object); - int (*expects)(void *closure, const char *event); -}; - -struct afb_event_listener -{ - const struct afb_event_listener_itf *itf; - void *closure; -}; - extern void ctxStoreInit (int max_session_count, int timeout, const char *initok, int context_count); extern struct AFB_clientCtx *ctxClientGetSession (const char *uuid, int *created); @@ -39,10 +27,6 @@ extern struct AFB_clientCtx *ctxClientAddRef(struct AFB_clientCtx *clientCtx); extern void ctxClientUnref(struct AFB_clientCtx *clientCtx); extern void ctxClientClose (struct AFB_clientCtx *clientCtx); -extern int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener); -extern void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener); -extern int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object); - extern int ctxTokenCheck (struct AFB_clientCtx *clientCtx, const char *token); extern void ctxTokenNew (struct AFB_clientCtx *clientCtx); -- 2.16.6