evt: handles broadcasting and tracking 15/5915/1
authorJosé Bollo <jose.bollo@iot.bzh>
Fri, 17 Jun 2016 20:31:33 +0000 (22:31 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Thu, 23 Jun 2016 09:04:23 +0000 (11:04 +0200)
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/afb-api-dbus.c
src/afb-evt.c
src/afb-evt.h
src/afb-svc.c
src/afb-ws-json1.c

index 3d9da07..054755a 100644 (file)
@@ -537,7 +537,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
        return 1;
 }
 
-static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, struct json_object *object)
+static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, int eventid, struct json_object *object)
 {
        int rc;
 
@@ -548,6 +548,12 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve
        json_object_put(object);
 }
 
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+       .broadcast = (void*)afb_api_dbus_server_send_event,
+       .push = (void*)afb_api_dbus_server_send_event
+};
+
 /* create the service */
 int afb_api_dbus_add_server(const char *path)
 {
@@ -576,7 +582,7 @@ int afb_api_dbus_add_server(const char *path)
        }
        INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
 
-       api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api);
+       api->listener = afb_evt_listener_create(&evt_itf, api);
 
        return 0;
 error3:
index 53ab0e0..4924b8c 100644 (file)
@@ -38,8 +38,8 @@ struct afb_evt_listener {
        /* chaining listeners */
        struct afb_evt_listener *next;
 
-       /* callback on event */
-       void (*send)(void *closure, const char *event, struct json_object *object);
+       /* interface for callbacks */
+       const struct afb_evt_itf *itf;
 
        /* closure for the callback */
        void *closure;
@@ -56,9 +56,15 @@ struct afb_evt_listener {
  */
 struct afb_evt_event {
 
+       /* next event */
+       struct afb_evt_event *next;
+
        /* head of the list of listeners watching the event */
        struct afb_evt_watch *watchs;
 
+       /* id of the event */
+       int id;
+
        /* name of the event */
        char name[1];
 };
@@ -79,6 +85,9 @@ struct afb_evt_watch {
 
        /* link to the next event for the same listener */
        struct afb_evt_watch *next_by_listener;
+
+       /* activity */
+       unsigned activity;
 };
 
 /* declare functions */
@@ -96,6 +105,11 @@ static struct afb_event_itf afb_evt_event_itf = {
 /* head of the list of listeners */
 static struct afb_evt_listener *listeners = NULL;
 
+/* handling id of events */
+static struct afb_evt_event *events = NULL;
+static int event_id_counter = 0;
+static int event_id_wrapped = 0;
+
 /*
  * Broadcasts the event 'evt' with its 'object'
  * 'object' is released (like json_object_put)
@@ -119,9 +133,11 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
        result = 0;
        listener = listeners;
        while(listener) {
-               listener->send(listener->closure, event, json_object_get(object));
+               if (listener->itf->broadcast != NULL) {
+                       listener->itf->broadcast(listener->closure, event, 0, json_object_get(object));
+                       result++;
+               }
                listener = listener->next;
-               result++;
        }
        json_object_put(object);
        return result;
@@ -140,9 +156,11 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
 
        result = 0;
        watch = evt->watchs;
-       while(listener) {
+       while(watch) {
                listener = watch->listener;
-               listener->send(listener->closure, evt->name, json_object_get(obj));
+               assert(listener->itf->push != NULL);
+               if (watch->activity != 0)
+                       listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj));
                watch = watch->next_by_event;
                result++;
        }
@@ -156,15 +174,23 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
 static void remove_watch(struct afb_evt_watch *watch)
 {
        struct afb_evt_watch **prv;
+       struct afb_evt_event *evt;
+       struct afb_evt_listener *listener;
+
+       /* notify listener if needed */
+       evt = watch->event;
+       listener = watch->listener;
+       if (watch->activity != 0 && listener->itf->remove != NULL)
+               listener->itf->remove(listener->closure, evt->name, evt->id);
 
        /* unlink the watch for its event */
-       prv = &watch->event->watchs;
+       prv = &evt->watchs;
        while(*prv != watch)
                prv = &(*prv)->next_by_event;
        *prv = watch->next_by_event;
 
        /* unlink the watch for its listener */
-       prv = &watch->listener->watchs;
+       prv = &listener->watchs;
        while(*prv != watch)
                prv = &(*prv)->next_by_listener;
        *prv = watch->next_by_listener;
@@ -178,11 +204,26 @@ static void remove_watch(struct afb_evt_watch *watch)
  */
 static void evt_destroy(struct afb_evt_event *evt)
 {
+       struct afb_evt_event **prv;
        if (evt != NULL) {
-               /* removes all watchers */
-               while(evt->watchs != NULL)
-                       remove_watch(evt->watchs);
-               free(evt);
+               /* removes the event if valid! */
+               prv = &events;
+               while (*prv != NULL) {
+                       if (*prv != evt)
+                               prv = &(*prv)->next;
+                       else {
+                               /* valid, unlink */
+                               *prv = evt->next;
+
+                               /* removes all watchers */
+                               while(evt->watchs != NULL)
+                                       remove_watch(evt->watchs);
+
+                               /* free */
+                               free(evt);
+                               break;
+                       }
+               }
        }
 }
 
@@ -195,13 +236,37 @@ struct afb_event afb_evt_create_event(const char *name)
        size_t len;
        struct afb_evt_event *evt;
 
+       /* allocates the id */
+       do {
+               if (++event_id_counter < 0) {
+                       event_id_wrapped = 1;
+                       event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
+               }
+               if (!event_id_wrapped)
+                       break;
+               evt = events;
+               while(evt != NULL && evt->id != event_id_counter)
+                       evt = evt->next;
+       } while (evt != NULL);
+
+       /* allocates the event */
        len = strlen(name);
        evt = malloc(len + sizeof * evt);
-       if (evt != NULL) {
-               evt->watchs = NULL;
-               memcpy(evt->name, name, len + 1);
-       }
+       if (evt == NULL)
+               goto error;
+
+       /* initialize the event */
+       evt->next = events;
+       evt->watchs = NULL;
+       evt->id = event_id_counter;
+       assert(evt->id > 0);
+       memcpy(evt->name, name, len + 1);
+       events = evt;
+
+       /* returns the event */
        return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+error:
+       return (struct afb_event){ .itf = NULL, .closure = NULL };
 }
 
 /*
@@ -212,19 +277,27 @@ const char *afb_evt_event_name(struct afb_event event)
        return (event.itf != &afb_evt_event_itf) ? NULL : ((struct afb_evt_event *)event.closure)->name;
 }
 
+/*
+ * Returns the id of the 'event'
+ */
+int afb_evt_event_id(struct afb_event event)
+{
+       return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id;
+}
+
 /*
  * Returns an instance of the listener defined by the 'send' callback
  * and the 'closure'.
  * Returns NULL in case of memory depletion.
  */
-struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
 {
        struct afb_evt_listener *listener;
 
        /* search if an instance already exists */
        listener = listeners;
        while (listener != NULL) {
-               if (listener->send == send && listener->closure == closure)
+               if (listener->itf == itf && listener->closure == closure)
                        return afb_evt_listener_addref(listener);
                listener = listener->next;
        }
@@ -234,7 +307,7 @@ struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, con
        if (listener != NULL) {
                /* init */
                listener->next = listeners;
-               listener->send = send;
+               listener->itf = itf;
                listener->closure = closure;
                listener->watchs = NULL;
                listener->refcount = 1;
@@ -286,16 +359,17 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
        struct afb_evt_event *evt;
 
        /* check parameter */
-       if (event.itf != &afb_evt_event_itf) {
+       if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) {
                errno = EINVAL;
                return -1;
        }
 
-       /* search the existing watch */
+       /* search the existing watch for the listener */
+       evt = event.closure;
        watch = listener->watchs;
        while(watch != NULL) {
-               if (watch->event == event.closure)
-                       return 0;
+               if (watch->event == evt)
+                       goto found;
                watch = watch->next_by_listener;
        }
 
@@ -307,14 +381,19 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
        }
 
        /* initialise and link */
-       evt = event.closure;
        watch->event = evt;
        watch->next_by_event = evt->watchs;
        watch->listener = listener;
        watch->next_by_listener = listener->watchs;
+       watch->activity = 0;
        evt->watchs = watch;
        listener->watchs = watch;
-       
+
+found:
+       if (watch->activity == 0 && listener->itf->add != NULL)
+               listener->itf->add(listener->closure, evt->name, evt->id);
+       watch->activity++;
+
        return 0;
 }
 
@@ -325,6 +404,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
 int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
 {
        struct afb_evt_watch *watch;
+       struct afb_evt_event *evt;
 
        /* check parameter */
        if (event.itf != &afb_evt_event_itf) {
@@ -333,16 +413,21 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve
        }
 
        /* search the existing watch */
+       evt = event.closure;
        watch = listener->watchs;
        while(watch != NULL) {
-               if (watch->event == event.closure) {
+               if (watch->event == evt) {
                        /* found: remove it */
-                       remove_watch(watch);
-                       break;
+                       if (watch->activity != 0) {
+                               watch->activity--;
+                               if (watch->activity == 0 && listener->itf->remove != NULL)
+                                       listener->itf->remove(listener->closure, evt->name, evt->id);
+                       }
+                       return 0;
                }
                watch = watch->next_by_listener;
        }
-       return 0;
+       errno = ENOENT;
+       return -1;
 }
 
-
index 157a777..8ebb2ec 100644 (file)
@@ -22,7 +22,15 @@ struct AFB_clientCtx;
 
 struct afb_evt_listener;
 
-extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure);
+struct afb_evt_itf
+{
+       void (*push)(void *closure, const char *event, int eventid, struct json_object *object);
+       void (*broadcast)(void *closure, const char *event, int eventid, struct json_object *object);
+       void (*add)(void *closure, const char *event, int eventid);
+       void (*remove)(void *closure, const char *event, int eventid);
+};
+
+extern struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure);
 
 extern int afb_evt_broadcast(const char *event, struct json_object *object);
 
@@ -31,6 +39,7 @@ extern void afb_evt_listener_unref(struct afb_evt_listener *listener);
 
 extern struct afb_event afb_evt_create_event(const char *name);
 extern const char *afb_evt_event_name(struct afb_event event);
+extern int afb_evt_event_id(struct afb_event event);
 
 extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event);
 extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event);
index 03ff4b8..95617d5 100644 (file)
@@ -64,7 +64,7 @@ struct svc_req
 };
 
 /* functions for services */
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object);
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object);
 static void svc_call(struct afb_svc *svc, const char *api, const char *verb, struct json_object *args,
                                void (*callback)(void*, int, struct json_object*), void *closure);
 
@@ -73,6 +73,12 @@ static const struct afb_service_itf service_itf = {
        .call = (void*)svc_call
 };
 
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+       .broadcast = (void*)svc_on_event,
+       .push = (void*)svc_on_event
+};
+
 /* functions for requests of services */
 static void svcreq_addref(struct svc_req *svcreq);
 static void svcreq_unref(struct svc_req *svcreq);
@@ -130,7 +136,7 @@ struct afb_svc *afb_svc_create(int share_session, int (*init)(struct afb_service
        if (on_event == NULL)
                svc->listener = NULL;
        else {
-               svc->listener = afb_evt_listener_create((void*)svc_on_event, svc);
+               svc->listener = afb_evt_listener_create(&evt_itf, svc);
                if (svc->listener == NULL)
                        goto error3;
        }
@@ -156,7 +162,7 @@ error:
 /*
  * Propagates the event to the service
  */
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object)
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object)
 {
        svc->on_event(event, object);
 }
index 4cfc918..9d295e7 100644 (file)
@@ -44,7 +44,7 @@ struct afb_wsreq;
 /* predeclaration of websocket callbacks */
 static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1);
 static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg);
-static void aws_on_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object);
+static void aws_on_event(struct afb_ws_json1 *ws, const char *event, int eventid, struct json_object *object);
 
 /* predeclaration of wsreq callbacks */
 static void wsreq_addref(struct afb_wsreq *wsreq);
@@ -110,6 +110,12 @@ const struct afb_req_itf afb_ws_json1_req_itf = {
        .subcall = (void*)wsreq_subcall
 };
 
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+       .broadcast = (void*)aws_on_event,
+       .push = (void*)aws_on_event
+};
+
 /***************************************************************
 ****************************************************************
 **
@@ -141,7 +147,7 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo
        if (result->wsj1 == NULL)
                goto error3;
 
-       result->listener = afb_evt_listener_create((void*)aws_on_event, result);
+       result->listener = afb_evt_listener_create(&evt_itf, result);
        if (result->listener == NULL)
                goto error4;
 
@@ -217,7 +223,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve
        wsreq_unref(wsreq);
 }
 
-static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object)
+static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventid, struct json_object *object)
 {
        afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
 }