Events: refactoring 29/5929/1
authorJosé Bollo <jose.bollo@iot.bzh>
Thu, 9 Jun 2016 05:54:31 +0000 (07:54 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Thu, 9 Jun 2016 06:39:02 +0000 (08:39 +0200)
This new version allows to subscribe a client for
an event.

The event should first be created for the API
(the API's prefix is added) using 'afb_daemon_make_event'.

After that, plugins can subscribe or unsubscribe their
clients (identified through requests) to the events that
it generates. See 'afb_req_subscribe' and 'afb_req_unsubscribe'.

Events created by 'afb_daemon_make_event' can be widely
broadcasted using 'afb_event_broadcast' or pushed only to
suscribers using 'afb_event_push'.

Events can be destroyed using 'afb_event_drop'.

Change-Id: I7c0bed5e625c2052dcd81c6bfe960def1fa032f3
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
16 files changed:
include/afb/afb-event-itf.h [new file with mode: 0644]
include/afb/afb-plugin.h
include/afb/afb-req-itf.h
src/CMakeLists.txt
src/afb-api-dbus.c
src/afb-api-dbus.h
src/afb-api-so.c
src/afb-context.c
src/afb-evt.c [new file with mode: 0644]
src/afb-evt.h [new file with mode: 0644]
src/afb-hreq.c
src/afb-hreq.h
src/afb-ws-json1.c
src/afb-ws-json1.h
src/session.c
src/session.h

diff --git a/include/afb/afb-event-itf.h b/include/afb/afb-event-itf.h
new file mode 100644 (file)
index 0000000..47ffa38
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author: José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+/* avoid inclusion of <json-c/json.h> */
+struct json_object;
+
+/*
+ * Interface for handling requests.
+ * It records the functions to be called for the request.
+ * Don't use this structure directly.
+ * Use the helper functions documented below.
+ */ 
+struct afb_event_itf {
+       /* CAUTION: respect the order, add at the end */
+
+       int (*broadcast)(void *closure, struct json_object *obj);
+       int (*push)(void *closure, struct json_object *obj);
+       void (*drop)(void *closure);
+};
+
+/*
+ * Describes the request by plugins from afb-daemon
+ */
+struct afb_event {
+       const struct afb_event_itf *itf;        /* the interface to use */
+       void *closure;                          /* the closure argument for functions of 'itf' */
+};
+
+/*
+ * Broadcasts widely the 'event' with the data 'object'.
+ * 'object' can be NULL.
+ *
+ * For conveniency, the function calls 'json_object_put' for 'object'.
+ * Thus, in the case where 'object' should remain available after
+ * the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
+ */
+static inline int afb_event_broadcast(struct afb_event event, struct json_object *object)
+{
+       return event.itf->broadcast(event.closure, object);
+}
+
+/*
+ * Pushes the 'event' with the data 'object' to its obeservers.
+ * 'object' can be NULL.
+ *
+ * For conveniency, the function calls 'json_object_put' for 'object'.
+ * Thus, in the case where 'object' should remain available after
+ * the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
+ */
+static inline int afb_event_push(struct afb_event event, struct json_object *object)
+{
+       return event.itf->push(event.closure, object);
+}
+
+/*
+ * Drops the data associated to the event
+ * After calling this function, the event
+ * MUST NOT BE USED ANYMORE.
+ */
+static inline void afb_event_drop(struct afb_event event)
+{
+       event.itf->drop(event.closure);
+}
+
index 1eb3475..85e8de7 100644 (file)
@@ -38,6 +38,7 @@
  * Some function of the library are exported to afb-daemon.
  */
 
+#include <afb/afb-event-itf.h>
 #include <afb/afb-req-itf.h>
 
 /*
@@ -140,11 +141,12 @@ struct sd_bus;
  * Definition of the facilities provided by the daemon.
  */
 struct afb_daemon_itf {
-       void (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */
+       int (*event_broadcast)(void *closure, const char *name, struct json_object *object); /* broadcasts evant 'name' with 'object' */
        struct sd_event *(*get_event_loop)(void *closure);      /* gets the common systemd's event loop */
        struct sd_bus *(*get_user_bus)(void *closure);          /* gets the common systemd's user d-bus */
        struct sd_bus *(*get_system_bus)(void *closure);        /* gets the common systemd's system d-bus */
        void (*vverbose)(void*closure, int level, const char *file, int line, const char *fmt, va_list args);
+       struct afb_event (*event_make)(void *closure, const char *name); /* creates an event of 'name' */
 };
 
 /*
@@ -206,12 +208,23 @@ static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon)
  * For conveniency, the function calls 'json_object_put' for 'object'.
  * Thus, in the case where 'object' should remain available after
  * the function returns, the function 'json_object_get' shall be used.
+ *
+ * Returns the count of clients that received the event.
  */
-static inline void afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object)
+static inline int afb_daemon_broadcast_event(struct afb_daemon daemon, const char *name, struct json_object *object)
 {
        return daemon.itf->event_broadcast(daemon.closure, name, object);
 }
 
+/*
+ * Creates an event of 'name' and returns it.
+ * 'daemon' MUST be the daemon given in interface when activating the plugin.
+ */
+static inline struct afb_event afb_daemon_make_event(struct afb_daemon daemon, const char *name)
+{
+       return daemon.itf->event_make(daemon.closure, name);
+}
+
 /*
  * Send a message described by 'fmt' and following parameters
  * to the journal for the verbosity 'level'.
index f4fab55..2b3bc46 100644 (file)
@@ -25,6 +25,8 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include <afb/afb-event-itf.h>
+
 /* avoid inclusion of <json-c/json.h> */
 struct json_object;
 
@@ -65,6 +67,9 @@ struct afb_req_itf {
 
        void (*session_close)(void *closure);
        int (*session_set_LOA)(void *closure, unsigned level);
+
+       int (*subscribe)(void *closure, struct afb_event event);
+       int (*unsubscribe)(void *closure, struct afb_event event);
 };
 
 /*
@@ -315,6 +320,29 @@ static inline struct afb_req afb_req_unstore(struct afb_req *req)
        return result;
 }
 
+/*
+ * Establishes for the client link identified by 'req' a subscription
+ * to the 'event'.
+ * Returns 0 in case of successful subscription or -1 in case of error.
+ */
+static inline int afb_req_subscribe(struct afb_req req, struct afb_event event)
+{
+       return req.itf->subscribe(req.closure, event);
+}
+
+/*
+ * Revokes the subscription established to the 'event' for the client
+ * link identified by 'req'.
+ * Returns 0 in case of successful subscription or -1 in case of error.
+ */
+static inline int afb_req_unsubscribe(struct afb_req req, struct afb_event event)
+{
+       return req.itf->unsubscribe(req.closure, event);
+}
+
+
+
+
 /* internal use */
 static inline const char *afb_req_raw(struct afb_req req, size_t *size)
 {
index ca20665..cbaf286 100644 (file)
@@ -35,6 +35,7 @@ ADD_LIBRARY(afb-lib STATIC
        afb-apis.c
        afb-common.c
        afb-context.c
+       afb-evt.c
        afb-hreq.c
        afb-hsrv.c
        afb-hswitch.c
index c0bc57a..eda7985 100644 (file)
@@ -35,6 +35,7 @@
 #include "afb-apis.h"
 #include "afb-api-so.h"
 #include "afb-context.h"
+#include "afb-evt.h"
 #include "verbose.h"
 
 static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
@@ -52,6 +53,7 @@ struct api_dbus
        char *path;             /* path of the object for the API */
        char *name;             /* name/interface of the object */
        char *api;              /* api name of the interface */
+       struct afb_evt_listener *listener;
 };
 
 #define RETOK   1
@@ -302,7 +304,7 @@ static int api_dbus_client_on_event(sd_bus_message *m, void *userdata, sd_bus_er
                ERROR("unreadable event");
        else {
                object = json_tokener_parse(data);
-               ctxClientEventSend(NULL, event, object);
+               afb_evt_broadcast(event, object);
                json_object_put(object);
        }
        return 1;
@@ -444,7 +446,7 @@ static void dbus_req_send(struct dbus_req *dreq, const char *buffer, size_t size
        dbus_req_reply(dreq, RETRAW, buffer, "");
 }
 
-struct afb_req_itf dbus_req_itf = {
+const struct afb_req_itf afb_api_dbus_req_itf = {
        .json = (void*)dbus_req_json,
        .get = (void*)dbus_req_get,
        .success = (void*)dbus_req_success,
@@ -504,7 +506,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
        dreq->message = sd_bus_message_ref(message);
        dreq->json = NULL;
        dreq->refcount = 1;
-       areq.itf = &dbus_req_itf;
+       areq.itf = &afb_api_dbus_req_itf;
        areq.closure = dreq;
        afb_apis_call_(areq, &dreq->context, api->api, method);
        dbus_req_unref(dreq);
@@ -522,19 +524,6 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve
        json_object_put(object);
 }
 
-static int afb_api_dbus_server_expects_event(struct api_dbus *api, const char *event)
-{
-       size_t len = strlen(api->api);
-       if (strncasecmp(event, api->api, len) != 0)
-               return 0;
-       return event[len] == '.';
-}
-
-static struct afb_event_listener_itf evitf = {
-       .send = (void*)afb_api_dbus_server_send_event,
-       .expects = (void*)afb_api_dbus_server_expects_event
-};
-
 /* create the service */
 int afb_api_dbus_add_server(const char *path)
 {
@@ -563,7 +552,7 @@ int afb_api_dbus_add_server(const char *path)
        }
        INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
 
-       ctxClientEventListenerAdd(NULL, (struct afb_event_listener){ .itf = &evitf, .closure = api });
+       api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api);
 
        return 0;
 error3:
index c8a7bc3..10f5f7f 100644 (file)
 
 #pragma once
 
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_api_dbus_req_itf;
+
 extern int afb_api_dbus_add_client(const char *path);
 
 extern int afb_api_dbus_add_server(const char *path);
index b741b13..84f6975 100644 (file)
@@ -30,6 +30,7 @@
 
 #include <afb/afb-plugin.h>
 #include <afb/afb-req-itf.h>
+#include <afb/afb-event-itf.h>
 
 #include "session.h"
 #include "afb-common.h"
@@ -37,6 +38,7 @@
 #include "afb-apis.h"
 #include "afb-api-so.h"
 #include "afb-sig-handler.h"
+#include "afb-evt.h"
 #include "verbose.h"
 
 /*
@@ -58,18 +60,37 @@ void afb_api_so_set_timeout(int to)
        api_timeout = to;
 }
 
+static struct afb_event afb_api_so_event_make(struct api_so_desc *desc, const char *name)
+{
+       size_t length;
+       char *event;
+
+       /* makes the event name */
+       assert(desc->plugin != NULL);
+       length = strlen(name);
+       event = alloca(length + 2 + desc->apilength);
+       memcpy(event, desc->plugin->v1.prefix, desc->apilength);
+       event[desc->apilength] = '/';
+       memcpy(event + desc->apilength + 1, name, length + 1);
+
+       /* crate the event */
+       return afb_evt_create_event(event);
+}
+
 static int afb_api_so_event_broadcast(struct api_so_desc *desc, const char *name, struct json_object *object)
 {
        size_t length;
        char *event;
 
+       /* makes the event name */
        assert(desc->plugin != NULL);
        length = strlen(name);
        event = alloca(length + 2 + desc->apilength);
        memcpy(event, desc->plugin->v1.prefix, desc->apilength);
        event[desc->apilength] = '/';
        memcpy(event + desc->apilength + 1, name, length + 1);
-       return ctxClientEventSend(NULL, event, object);
+
+       return afb_evt_broadcast(event, object);
 }
 
 static void afb_api_so_vverbose(struct api_so_desc *desc, int level, const char *file, int line, const char *fmt, va_list args)
@@ -89,7 +110,8 @@ static const struct afb_daemon_itf daemon_itf = {
        .get_event_loop = (void*)afb_common_get_event_loop,
        .get_user_bus = (void*)afb_common_get_user_bus,
        .get_system_bus = (void*)afb_common_get_system_bus,
-       .vverbose = (void*)afb_api_so_vverbose
+       .vverbose = (void*)afb_api_so_vverbose,
+       .event_make = (void*)afb_api_so_event_make
 };
 
 struct monitoring {
index ba093c3..5fe3276 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2015, 2016 "IoT.bzh"
  * Author "Fulup Ar Foll"
+ * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/src/afb-evt.c b/src/afb-evt.c
new file mode 100644 (file)
index 0000000..00261b6
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (C) 2015, 2016 "IoT.bzh"
+ * Author "Fulup Ar Foll"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include <json-c/json.h>
+#include <afb/afb-event-itf.h>
+
+#include "afb-evt.h"
+
+
+struct afb_evt_watch;
+
+struct afb_evt_listener {
+       struct afb_evt_listener *next;
+       void (*send)(void *closure, const char *event, struct json_object *object);
+       void *closure;
+       struct afb_evt_watch *watchs;
+       int refcount;
+};
+
+struct afb_evt_event {
+       struct afb_evt_watch *watchs;
+       char name[1];
+};
+
+struct afb_evt_watch {
+       struct afb_evt_event *event;
+       struct afb_evt_watch *next_by_event;
+       struct afb_evt_listener *listener;
+       struct afb_evt_watch *next_by_listener;
+};
+
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *obj);
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj);
+static void evt_drop(struct afb_evt_event *evt);
+
+static struct afb_event_itf afb_evt_event_itf = {
+       .broadcast = (void*)evt_broadcast,
+       .push = (void*)evt_push,
+       .drop = (void*)evt_drop
+};
+
+static struct afb_evt_listener *listeners = NULL;
+
+static inline int evt_trash(struct json_object *obj)
+{
+       return 0;
+}
+
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
+{
+       return afb_evt_broadcast(evt->name, object);
+}
+
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+       int result;
+       struct afb_evt_listener *listener;
+
+       result = 0;
+       listener = listeners;
+       while(listener) {
+               listener->send(listener->closure, event, json_object_get(object));
+               listener = listener->next;
+       }
+       json_object_put(object);
+       return result;
+}
+
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+{
+       int result;
+       struct afb_evt_watch *watch;
+       struct afb_evt_listener *listener;
+
+       result = 0;
+       watch = evt->watchs;
+       while(listener) {
+               listener = watch->listener;
+               listener->send(listener->closure, evt->name, json_object_get(obj));
+               watch = watch->next_by_event;
+       }
+       json_object_put(obj);
+       return result;
+}
+
+static void remove_watch(struct afb_evt_watch *watch)
+{
+       struct afb_evt_watch **prv;
+
+       prv = &watch->event->watchs;
+       while(*prv != watch)
+               prv = &(*prv)->next_by_event;
+       *prv = watch->next_by_event;
+
+       prv = &watch->listener->watchs;
+       while(*prv != watch)
+               prv = &(*prv)->next_by_listener;
+       *prv = watch->next_by_listener;
+
+       free(watch);
+}
+
+static void evt_drop(struct afb_evt_event *evt)
+{
+       if (evt != NULL) {
+               while(evt->watchs != NULL)
+                       remove_watch(evt->watchs);
+               free(evt);
+       }
+}
+
+struct afb_event afb_evt_create_event(const char *name)
+{
+       size_t len;
+       struct afb_evt_event *evt;
+
+       len = strlen(name);
+       evt = malloc(len + sizeof * evt);
+       if (evt != NULL) {
+               evt->watchs = NULL;
+               memcpy(evt->name, name, len + 1);
+       }
+       return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+}
+
+struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+{
+       struct afb_evt_listener *listener;
+
+       /* search if an instance already exists */
+       listener = listeners;
+       while (listener != NULL) {
+               if (listener->send == send && listener->closure == closure)
+                       return afb_evt_listener_addref(listener);
+               listener = listener->next;
+       }
+
+       /* allocates */
+       listener = calloc(1, sizeof *listener);
+       if (listener != NULL) {
+               /* init */
+               listener->next = listeners;
+               listener->send = send;
+               listener->closure = closure;
+               listener->watchs = NULL;
+               listener->refcount = 1;
+               listeners = listener;
+       }
+       return listener;
+}
+
+struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
+{
+       listener->refcount++;
+       return listener;
+}
+
+void afb_evt_listener_unref(struct afb_evt_listener *listener)
+{
+       if (0 == --listener->refcount) {
+               struct afb_evt_listener **prv;
+
+               /* remove the watchers */
+               while (listener->watchs != NULL)
+                       remove_watch(listener->watchs);
+
+               /* unlink the listener */
+               prv = &listeners;
+               while (*prv != listener)
+                       prv = &(*prv)->next;
+               *prv = listener->next;
+
+               /* free the listener */
+               free(listener);
+       }
+}
+
+int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
+{
+       struct afb_evt_watch *watch;
+       struct afb_evt_event *evt;
+
+       /* check parameter */
+       if (event.itf != &afb_evt_event_itf) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       /* search the existing watch */
+       watch = listener->watchs;
+       while(watch != NULL) {
+               if (watch->event == event.closure)
+                       return 0;
+               watch = watch->next_by_listener;
+       }
+
+       /* not found, allocate a new */
+       watch = malloc(sizeof *watch);
+       if (watch == NULL) {
+               errno = ENOMEM;
+               return -1;
+       }
+
+       /* initialise and link */
+       evt = event.closure;
+       watch->event = evt;
+       watch->next_by_event = evt->watchs;
+       watch->listener = listener;
+       watch->next_by_listener = listener->watchs;
+       evt->watchs = watch;
+       listener->watchs = watch;
+       
+       return 0;
+}
+
+int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
+{
+       struct afb_evt_watch *watch;
+
+       /* check parameter */
+       if (event.itf != &afb_evt_event_itf) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       /* search the existing watch */
+       watch = listener->watchs;
+       while(watch != NULL) {
+               if (watch->event == event.closure) {
+                       /* found: remove it */
+                       remove_watch(watch);
+                       break;
+               }
+               watch = watch->next_by_listener;
+       }
+       return 0;
+}
+
+
diff --git a/src/afb-evt.h b/src/afb-evt.h
new file mode 100644 (file)
index 0000000..8e10254
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author: José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+struct afb_event;
+struct AFB_clientCtx;
+
+struct afb_evt_listener;
+
+extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure);
+
+extern int afb_evt_broadcast(const char *event, struct json_object *object);
+
+extern struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener);
+extern void afb_evt_listener_unref(struct afb_evt_listener *listener);
+
+extern struct afb_event afb_evt_create_event(const char *name);
+
+extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event);
+extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event);
+
index 4ca8441..1e38b41 100644 (file)
@@ -74,8 +74,9 @@ static void req_fail(struct afb_hreq *hreq, const char *status, const char *info
 static void req_success(struct afb_hreq *hreq, json_object *obj, const char *info);
 static const char *req_raw(struct afb_hreq *hreq, size_t *size);
 static void req_send(struct afb_hreq *hreq, const char *buffer, size_t size);
+static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event);
 
-static const struct afb_req_itf afb_hreq_itf = {
+const struct afb_req_itf afb_hreq_req_itf = {
        .json = (void*)req_json,
        .get = (void*)req_get,
        .success = (void*)req_success,
@@ -87,7 +88,9 @@ static const struct afb_req_itf afb_hreq_itf = {
        .addref = (void*)afb_hreq_addref,
        .unref = (void*)afb_hreq_unref,
        .session_close = (void*)afb_context_close,
-       .session_set_LOA = (void*)afb_context_change_loa
+       .session_set_LOA = (void*)afb_context_change_loa,
+       .subscribe = (void*)req_subscribe_unsubscribe_error,
+       .unsubscribe = (void*)req_subscribe_unsubscribe_error
 };
 
 static struct hreq_data *get_data(struct afb_hreq *hreq, const char *key, int create)
@@ -697,7 +700,7 @@ int afb_hreq_post_add_file(struct afb_hreq *hreq, const char *key, const char *f
 
 struct afb_req afb_hreq_to_req(struct afb_hreq *hreq)
 {
-       return (struct afb_req){ .itf = &afb_hreq_itf, .closure = hreq };
+       return (struct afb_req){ .itf = &afb_hreq_req_itf, .closure = hreq };
 }
 
 static struct afb_arg req_get(struct afb_hreq *hreq, const char *name)
@@ -798,6 +801,12 @@ static void req_success(struct afb_hreq *hreq, json_object *obj, const char *inf
        req_reply(hreq, MHD_HTTP_OK, "success", info, obj);
 }
 
+static int req_subscribe_unsubscribe_error(struct afb_hreq *hreq, struct afb_event event)
+{
+       errno = EINVAL;
+       return -1;
+}
+
 int afb_hreq_init_context(struct afb_hreq *hreq)
 {
        const char *uuid;
index 836f470..772cd67 100644 (file)
@@ -21,6 +21,9 @@ struct AFB_clientCtx;
 struct json_object;
 struct hreq_data;
 struct afb_hsrv;
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_hreq_req_itf;
 
 struct afb_hreq {
        /*
index ffb6b81..5ef751f 100644 (file)
@@ -32,6 +32,7 @@
 #include <afb/afb-req-itf.h>
 #include "afb-apis.h"
 #include "afb-context.h"
+#include "afb-evt.h"
 #include "verbose.h"
 
 static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1);
@@ -50,22 +51,13 @@ struct afb_ws_json1
        void (*cleanup)(void*);
        void *cleanup_closure;
        struct AFB_clientCtx *session;
+       struct afb_evt_listener *listener;
        struct afb_wsj1 *wsj1;
        int new_session;
 };
 
 static void aws_send_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object);
 
-static const struct afb_event_listener_itf event_listener_itf = {
-       .send = (void*)aws_send_event,
-       .expects = NULL
-};
-
-static inline struct afb_event_listener listener_for(struct afb_ws_json1 *aws)
-{
-       return (struct afb_event_listener){ .itf = &event_listener_itf, .closure = aws };
-}
-
 struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure)
 {
        struct afb_ws_json1 *result;
@@ -89,7 +81,8 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo
        if (result->wsj1 == NULL)
                goto error3;
 
-       if (0 > ctxClientEventListenerAdd(result->session, listener_for(result)))
+       result->listener = afb_evt_listener_create((void*)aws_send_event, result);
+       if (result->listener == NULL)
                goto error4;
 
        return result;
@@ -114,7 +107,7 @@ static struct afb_ws_json1 *aws_addref(struct afb_ws_json1 *ws)
 static void aws_unref(struct afb_ws_json1 *ws)
 {
        if (--ws->refcount == 0) {
-               ctxClientEventListenerRemove(ws->session, listener_for(ws));
+               afb_evt_listener_unref(ws->listener);
                afb_wsj1_unref(ws->wsj1);
                if (ws->cleanup != NULL)
                        ws->cleanup(ws->cleanup_closure);
@@ -149,9 +142,10 @@ static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *
 static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info);
 static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size);
 static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size);
+static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event);
+static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event);
 
-
-static const struct afb_req_itf wsreq_itf = {
+const struct afb_req_itf afb_ws_json1_req_itf = {
        .json = (void*)wsreq_json,
        .get = (void*)wsreq_get,
        .success = (void*)wsreq_success,
@@ -163,7 +157,9 @@ static const struct afb_req_itf wsreq_itf = {
        .addref = (void*)wsreq_addref,
        .unref = (void*)wsreq_unref,
        .session_close = (void*)afb_context_close,
-       .session_set_LOA = (void*)afb_context_change_loa
+       .session_set_LOA = (void*)afb_context_change_loa,
+       .subscribe = (void*)wsreq_subscribe,
+       .unsubscribe = (void*)wsreq_unsubscribe
 };
 
 static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg)
@@ -197,7 +193,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve
 
        /* emits the call */
        r.closure = wsreq;
-       r.itf = &wsreq_itf;
+       r.itf = &afb_ws_json1_req_itf;
        afb_apis_call_(r, &wsreq->context, api, verb);
        wsreq_unref(wsreq);
 }
@@ -276,3 +272,13 @@ static void aws_send_event(struct afb_ws_json1 *aws, const char *event, struct j
        afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
 }
 
+static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event)
+{
+       return afb_evt_add_watch(wsreq->aws->listener, event);
+}
+
+static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event)
+{
+       return afb_evt_remove_watch(wsreq->aws->listener, event);
+}
+
index fedbcf6..f714c22 100644 (file)
@@ -19,6 +19,9 @@
 
 struct afb_ws_json1;
 struct afb_context;
+struct afb_req_itf;
+
+extern const struct afb_req_itf afb_ws_json1_req_itf;
 
 extern struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *closure);
 
index 16dc836..e0d0a8e 100644 (file)
@@ -38,13 +38,6 @@ struct client_value
        void (*free_value)(void*);
 };
 
-struct afb_event_listener_list
-{
-       struct afb_event_listener_list *next;
-       struct afb_event_listener listener;
-       int refcount;
-};
-
 struct AFB_clientCtx
 {
        unsigned refcount;
@@ -54,7 +47,6 @@ struct AFB_clientCtx
        char uuid[37];        // long term authentication of remote client
        char token[37];       // short term authentication of remote client
        struct client_value *values;
-       struct afb_event_listener_list *listeners;
 };
 
 // Session UUID are store in a simple array [for 10 sessions this should be enough]
@@ -66,7 +58,6 @@ static struct {
   int timeout;
   int apicount;
   char initok[37];
-  struct afb_event_listener_list *listeners;
 } sessions;
 
 /* generate a uuid */
@@ -289,8 +280,6 @@ void ctxClientClose (struct AFB_clientCtx *clientCtx)
        if (clientCtx->uuid[0] != 0) {
                clientCtx->uuid[0] = 0;
                ctxUuidFreeCB (clientCtx);
-               while(clientCtx->listeners != NULL)
-                       ctxClientEventListenerRemove(clientCtx, clientCtx->listeners->listener);
                        if (clientCtx->refcount == 0) {
                        ctxStoreDel (clientCtx);
                        free(clientCtx);
@@ -326,103 +315,6 @@ void ctxTokenNew (struct AFB_clientCtx *clientCtx)
        clientCtx->expiration = NOW + sessions.timeout;
 }
 
-static int add_listener(struct afb_event_listener_list **head, struct afb_event_listener listener)
-{
-       struct afb_event_listener_list *iter, **prv;
-
-       prv = head;
-       for (;;) {
-               iter = *prv;
-               if (iter == NULL) {
-                       iter = calloc(1, sizeof *iter);
-                       if (iter == NULL) {
-                               errno = ENOMEM;
-                               return -1;
-                       }
-                       iter->listener = listener;
-                       iter->refcount = 1;
-                       *prv = iter;
-                       return 0;
-               }
-               if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) {
-                       iter->refcount++;
-                       return 0;
-               }
-               prv = &iter->next;
-       }
-}
-
-int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener)
-{
-       return add_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener);
-}
-
-static void remove_listener(struct afb_event_listener_list **head, struct afb_event_listener listener)
-{
-       struct afb_event_listener_list *iter, **prv;
-
-       prv = head;
-       for (;;) {
-               iter = *prv;
-               if (iter == NULL)
-                       return;
-               if (iter->listener.itf == listener.itf && iter->listener.closure == listener.closure) {
-                       if (!--iter->refcount) {
-                               *prv = iter->next;
-                               free(iter);
-                       }
-                       return;
-               }
-               prv = &iter->next;
-       }
-}
-
-void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener)
-{
-       remove_listener(clientCtx != NULL ? &clientCtx->listeners : &sessions.listeners, listener);
-}
-
-static int send(struct afb_event_listener_list *head, const char *event, struct json_object *object)
-{
-       struct afb_event_listener_list *iter;
-       int result;
-
-       result = 0;
-       iter = head;
-       while (iter != NULL) {
-               if (iter->listener.itf->expects == NULL || iter->listener.itf->expects(iter->listener.closure, event)) {
-                       iter->listener.itf->send(iter->listener.closure, event, json_object_get(object));
-                       result++;
-               }
-               iter = iter->next;
-       }
-
-       return result;
-}
-
-int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object)
-{
-       long idx;
-       time_t now;
-       int result;
-
-       now = NOW;
-       if (clientCtx != NULL) {
-               result = ctxIsActive(clientCtx, now) ? send(clientCtx->listeners, event, object) : 0;
-       } else {
-               result = send(sessions.listeners, event, object);
-               for (idx=0; idx < sessions.max; idx++) {
-                       clientCtx = ctxClientAddRef(sessions.store[idx]);
-                       if (clientCtx != NULL && ctxIsActive(clientCtx, now)) {
-                               clientCtx = ctxClientAddRef(clientCtx);
-                               result += send(clientCtx->listeners, event, object);
-                       }
-                       ctxClientUnref(clientCtx);
-               }
-       }
-       return result;
-}
-
 const char *ctxClientGetUuid (struct AFB_clientCtx *clientCtx)
 {
        assert(clientCtx != NULL);
index af07410..497951a 100644 (file)
 struct json_object;
 struct AFB_clientCtx;
 
-struct afb_event_listener_itf
-{
-       void (*send)(void *closure, const char *event, struct json_object *object);
-       int (*expects)(void *closure, const char *event);
-};
-
-struct afb_event_listener
-{
-       const struct afb_event_listener_itf *itf;
-       void *closure;
-};
-
 extern void ctxStoreInit (int max_session_count, int timeout, const char *initok, int context_count);
 
 extern struct AFB_clientCtx *ctxClientGetSession (const char *uuid, int *created);
@@ -39,10 +27,6 @@ extern struct AFB_clientCtx *ctxClientAddRef(struct AFB_clientCtx *clientCtx);
 extern void ctxClientUnref(struct AFB_clientCtx *clientCtx);
 extern void ctxClientClose (struct AFB_clientCtx *clientCtx);
 
-extern int ctxClientEventListenerAdd(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener);
-extern void ctxClientEventListenerRemove(struct AFB_clientCtx *clientCtx, struct afb_event_listener listener);
-extern int ctxClientEventSend(struct AFB_clientCtx *clientCtx, const char *event, struct json_object *object);
-
 extern int ctxTokenCheck (struct AFB_clientCtx *clientCtx, const char *token);
 extern void ctxTokenNew (struct AFB_clientCtx *clientCtx);