/* chaining listeners */
struct afb_evt_listener *next;
- /* callback on event */
- void (*send)(void *closure, const char *event, struct json_object *object);
+ /* interface for callbacks */
+ const struct afb_evt_itf *itf;
/* closure for the callback */
void *closure;
*/
struct afb_evt_event {
+ /* next event */
+ struct afb_evt_event *next;
+
/* head of the list of listeners watching the event */
struct afb_evt_watch *watchs;
+ /* id of the event */
+ int id;
+
/* name of the event */
char name[1];
};
/* link to the next event for the same listener */
struct afb_evt_watch *next_by_listener;
+
+ /* activity */
+ unsigned activity;
};
/* declare functions */
/* head of the list of listeners */
static struct afb_evt_listener *listeners = NULL;
+/* handling id of events */
+static struct afb_evt_event *events = NULL;
+static int event_id_counter = 0;
+static int event_id_wrapped = 0;
+
/*
* Broadcasts the event 'evt' with its 'object'
* 'object' is released (like json_object_put)
result = 0;
listener = listeners;
while(listener) {
- listener->send(listener->closure, event, json_object_get(object));
+ if (listener->itf->broadcast != NULL) {
+ listener->itf->broadcast(listener->closure, event, 0, json_object_get(object));
+ result++;
+ }
listener = listener->next;
- result++;
}
json_object_put(object);
return result;
result = 0;
watch = evt->watchs;
- while(listener) {
+ while(watch) {
listener = watch->listener;
- listener->send(listener->closure, evt->name, json_object_get(obj));
+ assert(listener->itf->push != NULL);
+ if (watch->activity != 0)
+ listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj));
watch = watch->next_by_event;
result++;
}
static void remove_watch(struct afb_evt_watch *watch)
{
struct afb_evt_watch **prv;
+ struct afb_evt_event *evt;
+ struct afb_evt_listener *listener;
+
+ /* notify listener if needed */
+ evt = watch->event;
+ listener = watch->listener;
+ if (watch->activity != 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
/* unlink the watch for its event */
- prv = &watch->event->watchs;
+ prv = &evt->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_event;
*prv = watch->next_by_event;
/* unlink the watch for its listener */
- prv = &watch->listener->watchs;
+ prv = &listener->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_listener;
*prv = watch->next_by_listener;
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ struct afb_evt_event **prv;
if (evt != NULL) {
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
- free(evt);
+ /* removes the event if valid! */
+ prv = &events;
+ while (*prv != NULL) {
+ if (*prv != evt)
+ prv = &(*prv)->next;
+ else {
+ /* valid, unlink */
+ *prv = evt->next;
+
+ /* removes all watchers */
+ while(evt->watchs != NULL)
+ remove_watch(evt->watchs);
+
+ /* free */
+ free(evt);
+ break;
+ }
+ }
}
}
size_t len;
struct afb_evt_event *evt;
+ /* allocates the id */
+ do {
+ if (++event_id_counter < 0) {
+ event_id_wrapped = 1;
+ event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
+ }
+ if (!event_id_wrapped)
+ break;
+ evt = events;
+ while(evt != NULL && evt->id != event_id_counter)
+ evt = evt->next;
+ } while (evt != NULL);
+
+ /* allocates the event */
len = strlen(name);
evt = malloc(len + sizeof * evt);
- if (evt != NULL) {
- evt->watchs = NULL;
- memcpy(evt->name, name, len + 1);
- }
+ if (evt == NULL)
+ goto error;
+
+ /* initialize the event */
+ evt->next = events;
+ evt->watchs = NULL;
+ evt->id = event_id_counter;
+ assert(evt->id > 0);
+ memcpy(evt->name, name, len + 1);
+ events = evt;
+
+ /* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+error:
+ return (struct afb_event){ .itf = NULL, .closure = NULL };
}
/*
return (event.itf != &afb_evt_event_itf) ? NULL : ((struct afb_evt_event *)event.closure)->name;
}
+/*
+ * Returns the id of the 'event'
+ */
+int afb_evt_event_id(struct afb_event event)
+{
+ return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id;
+}
+
/*
* Returns an instance of the listener defined by the 'send' callback
* and the 'closure'.
* Returns NULL in case of memory depletion.
*/
-struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
{
struct afb_evt_listener *listener;
/* search if an instance already exists */
listener = listeners;
while (listener != NULL) {
- if (listener->send == send && listener->closure == closure)
+ if (listener->itf == itf && listener->closure == closure)
return afb_evt_listener_addref(listener);
listener = listener->next;
}
if (listener != NULL) {
/* init */
listener->next = listeners;
- listener->send = send;
+ listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
struct afb_evt_event *evt;
/* check parameter */
- if (event.itf != &afb_evt_event_itf) {
+ if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) {
errno = EINVAL;
return -1;
}
- /* search the existing watch */
+ /* search the existing watch for the listener */
+ evt = event.closure;
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure)
- return 0;
+ if (watch->event == evt)
+ goto found;
watch = watch->next_by_listener;
}
}
/* initialise and link */
- evt = event.closure;
watch->event = evt;
watch->next_by_event = evt->watchs;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
+ watch->activity = 0;
evt->watchs = watch;
listener->watchs = watch;
-
+
+found:
+ if (watch->activity == 0 && listener->itf->add != NULL)
+ listener->itf->add(listener->closure, evt->name, evt->id);
+ watch->activity++;
+
return 0;
}
int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
{
struct afb_evt_watch *watch;
+ struct afb_evt_event *evt;
/* check parameter */
if (event.itf != &afb_evt_event_itf) {
}
/* search the existing watch */
+ evt = event.closure;
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure) {
+ if (watch->event == evt) {
/* found: remove it */
- remove_watch(watch);
- break;
+ if (watch->activity != 0) {
+ watch->activity--;
+ if (watch->activity == 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
+ }
+ return 0;
}
watch = watch->next_by_listener;
}
- return 0;
+ errno = ENOENT;
+ return -1;
}
-
};
/* functions for services */
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object);
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object);
static void svc_call(struct afb_svc *svc, const char *api, const char *verb, struct json_object *args,
void (*callback)(void*, int, struct json_object*), void *closure);
.call = (void*)svc_call
};
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+ .broadcast = (void*)svc_on_event,
+ .push = (void*)svc_on_event
+};
+
/* functions for requests of services */
static void svcreq_addref(struct svc_req *svcreq);
static void svcreq_unref(struct svc_req *svcreq);
if (on_event == NULL)
svc->listener = NULL;
else {
- svc->listener = afb_evt_listener_create((void*)svc_on_event, svc);
+ svc->listener = afb_evt_listener_create(&evt_itf, svc);
if (svc->listener == NULL)
goto error3;
}
/*
* Propagates the event to the service
*/
-static void svc_on_event(struct afb_svc *svc, const char *event, struct json_object *object)
+static void svc_on_event(struct afb_svc *svc, const char *event, int eventid, struct json_object *object)
{
svc->on_event(event, object);
}
/* predeclaration of websocket callbacks */
static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1);
static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg);
-static void aws_on_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object);
+static void aws_on_event(struct afb_ws_json1 *ws, const char *event, int eventid, struct json_object *object);
/* predeclaration of wsreq callbacks */
static void wsreq_addref(struct afb_wsreq *wsreq);
.subcall = (void*)wsreq_subcall
};
+/* the interface for events */
+static const struct afb_evt_itf evt_itf = {
+ .broadcast = (void*)aws_on_event,
+ .push = (void*)aws_on_event
+};
+
/***************************************************************
****************************************************************
**
if (result->wsj1 == NULL)
goto error3;
- result->listener = afb_evt_listener_create((void*)aws_on_event, result);
+ result->listener = afb_evt_listener_create(&evt_itf, result);
if (result->listener == NULL)
goto error4;
wsreq_unref(wsreq);
}
-static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object)
+static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventid, struct json_object *object)
{
afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object));
}