Fix the logic of unsubscribing to events. It
was not possible before to implment it without
tracking every session and context. It was not
done because of the required complexity.
This implementation ensures that unexpected
events lead to a removal of the listener from the
list of watchers of the events.
The management of the list of watchers is reworked
to free unused memory.
Bug-AGL: SPEC-3069
Change-Id: Ie67372adbde9dcb9dc6c5c2738111d22609e7256
Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
/* id of the event */
uint16_t id;
/* id of the event */
uint16_t id;
- /* has client? */
- int has_client;
-
/* fullname of the event */
char fullname[];
};
/* fullname of the event */
char fullname[];
};
/* link to the next watcher for the same listener */
struct afb_evt_watch *next_by_listener;
/* link to the next watcher for the same listener */
struct afb_evt_watch *next_by_listener;
-
- /* activity */
- unsigned activity;
*/
static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
*/
static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
{
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
pthread_rwlock_rdlock(&evtid->rwlock);
watch = evtid->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
- if (watch->activity != 0) {
- listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
- has_client = 1;
- }
+ listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
watch = watch->next_by_evtid;
}
watch = watch->next_by_evtid;
}
- evtid->has_client = has_client;
pthread_rwlock_unlock(&evtid->rwlock);
}
pthread_rwlock_unlock(&evtid->rwlock);
}
struct job_evtid *je;
int rc;
struct job_evtid *je;
int rc;
+ if (!evtid->watchs)
+ return 0;
+
je = make_job_evtid(evtid, object);
if (je == NULL) {
ERROR("Cant't create push evtid job item for %s(%s)",
je = make_job_evtid(evtid, object);
if (je == NULL) {
ERROR("Cant't create push evtid job item for %s(%s)",
rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
if (rc == 0)
rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
if (rc == 0)
- rc = evtid->has_client;
else {
ERROR("cant't queue push evtid job item for %s(%s)",
evtid->fullname, json_object_to_json_string(object));
else {
ERROR("cant't queue push evtid job item for %s(%s)",
evtid->fullname, json_object_to_json_string(object));
-/*
- * remove the 'watch'
- */
-static void remove_watch(struct afb_evt_watch *watch)
+static void unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, int remove)
+{
+ /* notify listener if needed */
+ if (remove && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+}
+
+static void evtid_unwatch(struct afb_evtid *evtid, struct afb_evt_listener *listener, struct afb_evt_watch *watch, int remove)
{
struct afb_evt_watch **prv;
{
struct afb_evt_watch **prv;
- struct afb_evtid *evtid;
- struct afb_evt_listener *listener;
/* notify listener if needed */
/* notify listener if needed */
- evtid = watch->evtid;
- listener = watch->listener;
- if (watch->activity != 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+ unwatch(listener, evtid, remove);
/* unlink the watch for its event */
/* unlink the watch for its event */
- prv = &evtid->watchs;
- while(*prv != watch)
- prv = &(*prv)->next_by_evtid;
- *prv = watch->next_by_evtid;
-
- /* unlink the watch for its listener */
+ pthread_rwlock_wrlock(&listener->rwlock);
+ while(*prv) {
+ if (*prv == watch) {
+ *prv = watch->next_by_listener;
+ break;
+ }
prv = &(*prv)->next_by_listener;
prv = &(*prv)->next_by_listener;
- *prv = watch->next_by_listener;
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+
+ /* recycle memory */
+ free(watch);
+}
+
+static void listener_unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, struct afb_evt_watch *watch, int remove)
+{
+ struct afb_evt_watch **prv;
+
+ /* notify listener if needed */
+ unwatch(listener, evtid, remove);
+
+ /* unlink the watch for its event */
+ pthread_rwlock_wrlock(&evtid->rwlock);
+ prv = &evtid->watchs;
+ while(*prv) {
+ if (*prv == watch) {
+ *prv = watch->next_by_evtid;
+ break;
+ }
+ prv = &(*prv)->next_by_evtid;
+ }
+ pthread_rwlock_unlock(&evtid->rwlock);
/* recycle memory */
free(watch);
/* recycle memory */
free(watch);
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = id;
evtid->refcount = 1;
evtid->watchs = NULL;
evtid->id = id;
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
#if WITH_AFB_HOOK
pthread_rwlock_init(&evtid->rwlock, NULL);
evtids = evtid;
#if WITH_AFB_HOOK
*/
void afb_evt_evtid_unref(struct afb_evtid *evtid)
{
*/
void afb_evt_evtid_unref(struct afb_evtid *evtid)
{
- int found;
- struct afb_evtid **prv;
- struct afb_evt_listener *listener;
+ struct afb_evtid **prv, *oev;
+ struct afb_evt_watch *watch, *nwatch;
if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
/* unlinks the event if valid! */
pthread_rwlock_wrlock(&events_rwlock);
if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
/* unlinks the event if valid! */
pthread_rwlock_wrlock(&events_rwlock);
- while (*prv && !(found = (*prv == evtid)))
- prv = &(*prv)->next;
- if (found) {
- *prv = evtid->next;
- event_count--;
+ for(;;) {
+ oev = *prv;
+ if (oev == evtid)
+ break;
+ if (!oev) {
+ ERROR("unexpected event");
+ pthread_rwlock_unlock(&events_rwlock);
+ return;
+ }
+ prv = &oev->next;
+ event_count--;
+ *prv = evtid->next;
pthread_rwlock_unlock(&events_rwlock);
pthread_rwlock_unlock(&events_rwlock);
- /* destroys the event */
- if (!found)
- ERROR("event not found");
- else {
- /* removes all watchers */
- while(evtid->watchs != NULL) {
- listener = evtid->watchs->listener;
- pthread_rwlock_wrlock(&listener->rwlock);
- pthread_rwlock_wrlock(&evtid->rwlock);
- remove_watch(evtid->watchs);
- pthread_rwlock_unlock(&evtid->rwlock);
- pthread_rwlock_unlock(&listener->rwlock);
- }
-
- /* free */
- pthread_rwlock_destroy(&evtid->rwlock);
- free(evtid);
+ /* removes all watchers */
+ pthread_rwlock_wrlock(&evtid->rwlock);
+ watch = evtid->watchs;
+ evtid->watchs = NULL;
+ pthread_rwlock_unlock(&evtid->rwlock);
+ while(watch) {
+ nwatch = watch->next_by_evtid;
+ evtid_unwatch(evtid, watch->listener, watch, 1);
+ watch = nwatch;
+
+ /* free */
+ pthread_rwlock_destroy(&evtid->rwlock);
+ free(evtid);
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- struct afb_evt_listener **prv;
- struct afb_evtid *evtid;
+ struct afb_evt_listener **prv, *olis;
if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
pthread_rwlock_wrlock(&listeners_rwlock);
prv = &listeners;
if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
pthread_rwlock_wrlock(&listeners_rwlock);
prv = &listeners;
- while (*prv != listener)
- prv = &(*prv)->next;
+ for(;;) {
+ olis = *prv;
+ if (olis == listener)
+ break;
+ if (!olis) {
+ ERROR("unexpected listener");
+ pthread_rwlock_unlock(&listeners_rwlock);
+ return;
+ }
+ prv = &olis->next;
+ }
*prv = listener->next;
pthread_rwlock_unlock(&listeners_rwlock);
/* remove the watchers */
*prv = listener->next;
pthread_rwlock_unlock(&listeners_rwlock);
/* remove the watchers */
- pthread_rwlock_wrlock(&listener->rwlock);
- while (listener->watchs != NULL) {
- evtid = listener->watchs->evtid;
- pthread_rwlock_wrlock(&evtid->rwlock);
- remove_watch(listener->watchs);
- pthread_rwlock_unlock(&evtid->rwlock);
- }
- pthread_rwlock_unlock(&listener->rwlock);
+ afb_evt_listener_unwatch_all(listener, 0);
/* free the listener */
pthread_rwlock_destroy(&listener->rwlock);
/* free the listener */
pthread_rwlock_destroy(&listener->rwlock);
watch = listener->watchs;
while(watch != NULL) {
if (watch->evtid == evtid)
watch = listener->watchs;
while(watch != NULL) {
if (watch->evtid == evtid)
watch = watch->next_by_listener;
}
watch = watch->next_by_listener;
}
/* initialise and link */
watch->evtid = evtid;
/* initialise and link */
watch->evtid = evtid;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
listener->watchs = watch;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
listener->watchs = watch;
evtid->watchs = watch;
pthread_rwlock_unlock(&evtid->rwlock);
evtid->watchs = watch;
pthread_rwlock_unlock(&evtid->rwlock);
-found:
- if (watch->activity == 0 && listener->itf->add != NULL)
+ if (listener->itf->add != NULL)
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
listener->itf->add(listener->closure, evtid->fullname, evtid->id);
- watch->activity++;
- evtid->has_client = 1;
pthread_rwlock_unlock(&listener->rwlock);
pthread_rwlock_unlock(&listener->rwlock);
*/
int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid)
{
*/
int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid)
{
- struct afb_evt_watch *watch;
+ struct afb_evt_watch *watch, **pwatch;
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
- watch = listener->watchs;
- while(watch != NULL) {
- if (watch->evtid == evtid) {
- if (watch->activity != 0) {
- watch->activity--;
- if (watch->activity == 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
- }
+ pwatch = &listener->watchs;
+ for (;;) {
+ watch = *pwatch;
+ if (!watch) {
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+ }
+ if (evtid == watch->evtid) {
+ *pwatch = watch->next_by_listener;
pthread_rwlock_unlock(&listener->rwlock);
pthread_rwlock_unlock(&listener->rwlock);
+ listener_unwatch(listener, evtid, watch, 1);
- watch = watch->next_by_listener;
+ pwatch = &watch->next_by_listener;
- pthread_rwlock_unlock(&listener->rwlock);
- errno = ENOENT;
- return -1;
*/
int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid)
{
*/
int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid)
{
- struct afb_evt_watch *watch;
+ struct afb_evt_watch *watch, **pwatch;
struct afb_evtid *evtid;
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
struct afb_evtid *evtid;
/* search the existing watch */
pthread_rwlock_wrlock(&listener->rwlock);
- watch = listener->watchs;
- while(watch != NULL) {
+ pwatch = &listener->watchs;
+ for (;;) {
+ watch = *pwatch;
+ if (!watch) {
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+ }
evtid = watch->evtid;
if (evtid->id == eventid) {
evtid = watch->evtid;
if (evtid->id == eventid) {
- if (watch->activity != 0) {
- watch->activity--;
- if (watch->activity == 0 && listener->itf->remove != NULL)
- listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
- }
+ *pwatch = watch->next_by_listener;
pthread_rwlock_unlock(&listener->rwlock);
pthread_rwlock_unlock(&listener->rwlock);
+ listener_unwatch(listener, evtid, watch, 1);
- watch = watch->next_by_listener;
+ pwatch = &watch->next_by_listener;
+}
+
+/*
+ * Avoids the 'listener' to watch any event, calling the callback
+ * 'remove' of the interface if 'remoe' is not zero.
+ */
+void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove)
+{
+ struct afb_evt_watch *watch, *nwatch;
+
+ /* search the existing watch */
+ pthread_rwlock_wrlock(&listener->rwlock);
+ watch = listener->watchs;
+ listener->watchs = NULL;
pthread_rwlock_unlock(&listener->rwlock);
pthread_rwlock_unlock(&listener->rwlock);
- errno = ENOENT;
- return -1;
+ while(watch) {
+ nwatch = watch->next_by_listener;
+ listener_unwatch(listener, watch->evtid, watch, remove);
+ watch = nwatch;
+ }
extern int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid);
extern int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid);
+extern void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove);
extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname);
extern struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name);
extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname);
extern struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name);
static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event)
{
static int server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event_x2 *event)
{
struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event));
struct server_req *wreq = CONTAINER_OF_XREQ(struct server_req, xreq);
rc = afb_proto_ws_call_unsubscribe(wreq->call, afb_evt_event_x2_id(event));
- rc2 = afb_evt_listener_unwatch_x2(wreq->stubws->listener, event);
- if (rc >= 0 && rc2 < 0)
- rc = rc2;
if (rc < 0)
ERROR("error while unsubscribing event");
return rc;
if (rc < 0)
ERROR("error while unsubscribing event");
return rc;