From d8ef25780bffa6f91f013ef71b1ede908325e59d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Fri, 17 Jun 2016 22:31:33 +0200 Subject: [PATCH] evt: handles broadcasting and tracking MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: José Bollo --- src/afb-api-dbus.c | 10 +++- src/afb-evt.c | 145 ++++++++++++++++++++++++++++++++++++++++++----------- src/afb-evt.h | 11 +++- src/afb-svc.c | 12 +++-- src/afb-ws-json1.c | 12 +++-- 5 files changed, 151 insertions(+), 39 deletions(-) diff --git a/src/afb-api-dbus.c b/src/afb-api-dbus.c index 3d9da07f..054755ac 100644 --- a/src/afb-api-dbus.c +++ b/src/afb-api-dbus.c @@ -537,7 +537,7 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd return 1; } -static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, struct json_object *object) +static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, int eventid, struct json_object *object) { int rc; @@ -548,6 +548,12 @@ static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *eve json_object_put(object); } +/* the interface for events */ +static const struct afb_evt_itf evt_itf = { + .broadcast = (void*)afb_api_dbus_server_send_event, + .push = (void*)afb_api_dbus_server_send_event +}; + /* create the service */ int afb_api_dbus_add_server(const char *path) { @@ -576,7 +582,7 @@ int afb_api_dbus_add_server(const char *path) } INFO("afb service over dbus installed, name %s, path %s", api->name, api->path); - api->listener = afb_evt_listener_create((void*)afb_api_dbus_server_send_event, api); + api->listener = afb_evt_listener_create(&evt_itf, api); return 0; error3: diff --git a/src/afb-evt.c b/src/afb-evt.c index 53ab0e0b..4924b8cb 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -38,8 +38,8 @@ struct afb_evt_listener { /* chaining listeners */ struct afb_evt_listener *next; - /* callback on event */ - void (*send)(void *closure, const char *event, struct json_object *object); + /* interface for callbacks */ + const struct afb_evt_itf *itf; /* closure for the callback */ void *closure; @@ -56,9 +56,15 @@ struct afb_evt_listener { */ struct afb_evt_event { + /* next event */ + struct afb_evt_event *next; + /* head of the list of listeners watching the event */ struct afb_evt_watch *watchs; + /* id of the event */ + int id; + /* name of the event */ char name[1]; }; @@ -79,6 +85,9 @@ struct afb_evt_watch { /* link to the next event for the same listener */ struct afb_evt_watch *next_by_listener; + + /* activity */ + unsigned activity; }; /* declare functions */ @@ -96,6 +105,11 @@ static struct afb_event_itf afb_evt_event_itf = { /* head of the list of listeners */ static struct afb_evt_listener *listeners = NULL; +/* handling id of events */ +static struct afb_evt_event *events = NULL; +static int event_id_counter = 0; +static int event_id_wrapped = 0; + /* * Broadcasts the event 'evt' with its 'object' * 'object' is released (like json_object_put) @@ -119,9 +133,11 @@ int afb_evt_broadcast(const char *event, struct json_object *object) result = 0; listener = listeners; while(listener) { - listener->send(listener->closure, event, json_object_get(object)); + if (listener->itf->broadcast != NULL) { + listener->itf->broadcast(listener->closure, event, 0, json_object_get(object)); + result++; + } listener = listener->next; - result++; } json_object_put(object); return result; @@ -140,9 +156,11 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj) result = 0; watch = evt->watchs; - while(listener) { + while(watch) { listener = watch->listener; - listener->send(listener->closure, evt->name, json_object_get(obj)); + assert(listener->itf->push != NULL); + if (watch->activity != 0) + listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj)); watch = watch->next_by_event; result++; } @@ -156,15 +174,23 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj) static void remove_watch(struct afb_evt_watch *watch) { struct afb_evt_watch **prv; + struct afb_evt_event *evt; + struct afb_evt_listener *listener; + + /* notify listener if needed */ + evt = watch->event; + listener = watch->listener; + if (watch->activity != 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evt->name, evt->id); /* unlink the watch for its event */ - prv = &watch->event->watchs; + prv = &evt->watchs; while(*prv != watch) prv = &(*prv)->next_by_event; *prv = watch->next_by_event; /* unlink the watch for its listener */ - prv = &watch->listener->watchs; + prv = &listener->watchs; while(*prv != watch) prv = &(*prv)->next_by_listener; *prv = watch->next_by_listener; @@ -178,11 +204,26 @@ static void remove_watch(struct afb_evt_watch *watch) */ static void evt_destroy(struct afb_evt_event *evt) { + struct afb_evt_event **prv; if (evt != NULL) { - /* removes all watchers */ - while(evt->watchs != NULL) - remove_watch(evt->watchs); - free(evt); + /* removes the event if valid! */ + prv = &events; + while (*prv != NULL) { + if (*prv != evt) + prv = &(*prv)->next; + else { + /* valid, unlink */ + *prv = evt->next; + + /* removes all watchers */ + while(evt->watchs != NULL) + remove_watch(evt->watchs); + + /* free */ + free(evt); + break; + } + } } } @@ -195,13 +236,37 @@ struct afb_event afb_evt_create_event(const char *name) size_t len; struct afb_evt_event *evt; + /* allocates the id */ + do { + if (++event_id_counter < 0) { + event_id_wrapped = 1; + event_id_counter = 1024; /* heuristic: small numbers are not destroyed */ + } + if (!event_id_wrapped) + break; + evt = events; + while(evt != NULL && evt->id != event_id_counter) + evt = evt->next; + } while (evt != NULL); + + /* allocates the event */ len = strlen(name); evt = malloc(len + sizeof * evt); - if (evt != NULL) { - evt->watchs = NULL; - memcpy(evt->name, name, len + 1); - } + if (evt == NULL) + goto error; + + /* initialize the event */ + evt->next = events; + evt->watchs = NULL; + evt->id = event_id_counter; + assert(evt->id > 0); + memcpy(evt->name, name, len + 1); + events = evt; + + /* returns the event */ return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt }; +error: + return (struct afb_event){ .itf = NULL, .closure = NULL }; } /* @@ -212,19 +277,27 @@ const char *afb_evt_event_name(struct afb_event event) return (event.itf != &afb_evt_event_itf) ? NULL : ((struct afb_evt_event *)event.closure)->name; } +/* + * Returns the id of the 'event' + */ +int afb_evt_event_id(struct afb_event event) +{ + return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id; +} + /* * Returns an instance of the listener defined by the 'send' callback * and the 'closure'. * Returns NULL in case of memory depletion. */ -struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure) +struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure) { struct afb_evt_listener *listener; /* search if an instance already exists */ listener = listeners; while (listener != NULL) { - if (listener->send == send && listener->closure == closure) + if (listener->itf == itf && listener->closure == closure) return afb_evt_listener_addref(listener); listener = listener->next; } @@ -234,7 +307,7 @@ struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, con if (listener != NULL) { /* init */ listener->next = listeners; - listener->send = send; + listener->itf = itf; listener->closure = closure; listener->watchs = NULL; listener->refcount = 1; @@ -286,16 +359,17 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) struct afb_evt_event *evt; /* check parameter */ - if (event.itf != &afb_evt_event_itf) { + if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) { errno = EINVAL; return -1; } - /* search the existing watch */ + /* search the existing watch for the listener */ + evt = event.closure; watch = listener->watchs; while(watch != NULL) { - if (watch->event == event.closure) - return 0; + if (watch->event == evt) + goto found; watch = watch->next_by_listener; } @@ -307,14 +381,19 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) } /* initialise and link */ - evt = event.closure; watch->event = evt; watch->next_by_event = evt->watchs; watch->listener = listener; watch->next_by_listener = listener->watchs; + watch->activity = 0; evt->watchs = watch; listener->watchs = watch; - + +found: + if (watch->activity == 0 && listener->itf->add != NULL) + listener->itf->add(listener->closure, evt->name, evt->id); + watch->activity++; + return 0; } @@ -325,6 +404,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event) { struct afb_evt_watch *watch; + struct afb_evt_event *evt; /* check parameter */ if (event.itf != &afb_evt_event_itf) { @@ -333,16 +413,21 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve } /* search the existing watch */ + evt = event.closure; watch = listener->watchs; while(watch != NULL) { - if (watch->event == event.closure) { + if (watch->event == evt) { /* found: remove it */ - remove_watch(watch); - break; + if (watch->activity != 0) { + watch->activity--; + if (watch->activity == 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evt->name, evt->id); + } + return 0; } watch = watch->next_by_listener; } - return 0; + errno = ENOENT; + return -1; } - diff --git a/src/afb-evt.h b/src/afb-evt.h index 157a7776..8ebb2ec0 100644 --- a/src/afb-evt.h +++ b/src/afb-evt.h @@ -22,7 +22,15 @@ struct AFB_clientCtx; struct afb_evt_listener; -extern struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure); +struct afb_evt_itf +{ + void (*push)(void *closure, const char *event, int eventid, struct json_object *object); + void (*broadcast)(void *closure, const char *event, int eventid, struct json_object *object); + void (*add)(void *closure, const char *event, int eventid); + void (*remove)(void *closure, const char *event, int eventid); +}; + +extern struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure); extern int afb_evt_broadcast(const char *event, struct json_object *object); @@ -31,6 +39,7 @@ extern void afb_evt_listener_unref(struct afb_evt_listener *listener); extern struct afb_event afb_evt_create_event(const char *name); extern const char *afb_evt_event_name(struct afb_event event); +extern int afb_evt_event_id(struct afb_event event); extern int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event); extern int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event); diff --git a/src/afb-svc.c b/src/afb-svc.c index 03ff4b84..95617d5d 100644 --- a/src/afb-svc.c +++ b/src/afb-svc.c @@ -64,7 +64,7 @@ struct svc_req }; /* functions for services */ -static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object); +static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object); static void svc_call(struct afb_svc *svc, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure); @@ -73,6 +73,12 @@ static const struct afb_service_itf service_itf = { .call = (void*)svc_call }; +/* the interface for events */ +static const struct afb_evt_itf evt_itf = { + .broadcast = (void*)svc_on_event, + .push = (void*)svc_on_event +}; + /* functions for requests of services */ static void svcreq_addref(struct svc_req *svcreq); static void svcreq_unref(struct svc_req *svcreq); @@ -130,7 +136,7 @@ struct afb_svc *afb_svc_create(int share_session, int (*init)(struct afb_service if (on_event == NULL) svc->listener = NULL; else { - svc->listener = afb_evt_listener_create((void*)svc_on_event, svc); + svc->listener = afb_evt_listener_create(&evt_itf, svc); if (svc->listener == NULL) goto error3; } @@ -156,7 +162,7 @@ error: /* * Propagates the event to the service */ -static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object) +static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object) { svc->on_event(event, object); } diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c index 4cfc9181..9d295e78 100644 --- a/src/afb-ws-json1.c +++ b/src/afb-ws-json1.c @@ -44,7 +44,7 @@ struct afb_wsreq; /* predeclaration of websocket callbacks */ static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1); static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg); -static void aws_on_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object); +static void aws_on_event(struct afb_ws_json1 *ws, const char *event, int eventid, struct json_object *object); /* predeclaration of wsreq callbacks */ static void wsreq_addref(struct afb_wsreq *wsreq); @@ -110,6 +110,12 @@ const struct afb_req_itf afb_ws_json1_req_itf = { .subcall = (void*)wsreq_subcall }; +/* the interface for events */ +static const struct afb_evt_itf evt_itf = { + .broadcast = (void*)aws_on_event, + .push = (void*)aws_on_event +}; + /*************************************************************** **************************************************************** ** @@ -141,7 +147,7 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo if (result->wsj1 == NULL) goto error3; - result->listener = afb_evt_listener_create((void*)aws_on_event, result); + result->listener = afb_evt_listener_create(&evt_itf, result); if (result->listener == NULL) goto error4; @@ -217,7 +223,7 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve wsreq_unref(wsreq); } -static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object) +static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventid, struct json_object *object) { afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object)); } -- 2.16.6