Tag broadcasted events with UUID and hop
[src/app-framework-binder.git] / src / afb-evt.c
1 /*
2  * Copyright (C) 2015-2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <pthread.h>
25
26 #include <json-c/json.h>
27 #include <afb/afb-event-x2-itf.h>
28 #include <afb/afb-event-x1.h>
29
30 #include "afb-evt.h"
31 #include "afb-hook.h"
32 #include "verbose.h"
33 #include "jobs.h"
34 #include "uuid.h"
35
36 struct afb_evt_watch;
37
38 /*
39  * Structure for event listeners
40  */
41 struct afb_evt_listener {
42
43         /* chaining listeners */
44         struct afb_evt_listener *next;
45
46         /* interface for callbacks */
47         const struct afb_evt_itf *itf;
48
49         /* closure for the callback */
50         void *closure;
51
52         /* head of the list of events listened */
53         struct afb_evt_watch *watchs;
54
55         /* rwlock of the listener */
56         pthread_rwlock_t rwlock;
57
58         /* count of reference to the listener */
59         int refcount;
60 };
61
62 /*
63  * Structure for describing events
64  */
65 struct afb_evtid {
66
67         /* interface */
68         struct afb_event_x2 eventid;
69
70         /* next event */
71         struct afb_evtid *next;
72
73         /* head of the list of listeners watching the event */
74         struct afb_evt_watch *watchs;
75
76         /* rwlock of the event */
77         pthread_rwlock_t rwlock;
78
79         /* hooking */
80         int hookflags;
81
82         /* refcount */
83         int refcount;
84
85         /* id of the event */
86         int id;
87
88         /* has client? */
89         int has_client;
90
91         /* fullname of the event */
92         char fullname[];
93 };
94
95 /*
96  * Structure for associating events and listeners
97  */
98 struct afb_evt_watch {
99
100         /* the evtid */
101         struct afb_evtid *evtid;
102
103         /* link to the next watcher for the same evtid */
104         struct afb_evt_watch *next_by_evtid;
105
106         /* the listener */
107         struct afb_evt_listener *listener;
108
109         /* link to the next watcher for the same listener */
110         struct afb_evt_watch *next_by_listener;
111
112         /* activity */
113         unsigned activity;
114 };
115
116 /*
117  * structure for job of broadcasting events
118  */
119 struct job_broadcast
120 {
121         /** object atached to the event */
122         struct json_object *object;
123
124         /** the uuid of the event */
125         uuid_binary_t  uuid;
126
127         /** remaining hop */
128         uint8_t hop;
129
130         /** name of the event to broadcast */
131         char event[];
132 };
133
134 /*
135  * structure for job of broadcasting or pushing events
136  */
137 struct job_evtid
138 {
139         /** the event to broadcast */
140         struct afb_evtid *evtid;
141
142         /** object atached to the event */
143         struct json_object *object;
144 };
145
146 /* the interface for events */
147 static struct afb_event_x2_itf afb_evt_event_x2_itf = {
148         .broadcast = (void*)afb_evt_evtid_broadcast,
149         .push = (void*)afb_evt_evtid_push,
150         .unref = (void*)afb_evt_evtid_unref,
151         .name = (void*)afb_evt_evtid_name,
152         .addref = (void*)afb_evt_evtid_addref
153 };
154
155 /* the interface for events */
156 static struct afb_event_x2_itf afb_evt_hooked_eventid_itf = {
157         .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
158         .push = (void*)afb_evt_evtid_hooked_push,
159         .unref = (void*)afb_evt_evtid_hooked_unref,
160         .name = (void*)afb_evt_evtid_hooked_name,
161         .addref = (void*)afb_evt_evtid_hooked_addref
162 };
163
164 /* job groups for events push/broadcast */
165 #define BROADCAST_JOB_GROUP  (&afb_evt_event_x2_itf)
166 #define PUSH_JOB_GROUP       (&afb_evt_event_x2_itf)
167
168 /* head of the list of listeners */
169 static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
170 static struct afb_evt_listener *listeners = NULL;
171
172 /* handling id of events */
173 static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER;
174 static struct afb_evtid *evtids = NULL;
175 static int event_id_counter = 0;
176 static int event_id_wrapped = 0;
177
178 /* head of uniqueness of events */
179 #if !defined(EVENT_BROADCAST_HOP_MAX)
180 #  define EVENT_BROADCAST_HOP_MAX  10
181 #endif
182 #if !defined(EVENT_BROADCAST_MEMORY_COUNT)
183 #  define EVENT_BROADCAST_MEMORY_COUNT  8
184 #endif
185
186 #if EVENT_BROADCAST_MEMORY_COUNT
187 static struct {
188         pthread_mutex_t mutex;
189         uint8_t base;
190         uint8_t count;
191         uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT];
192 } uniqueness = {
193         .mutex = PTHREAD_MUTEX_INITIALIZER,
194         .base = 0,
195         .count = 0
196 };
197 #endif
198
199 /*
200  * Create structure for job of broadcasting string 'event' with 'object'
201  * Returns the created structure or NULL if out of memory
202  */
203 static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
204 {
205         size_t sz = 1 + strlen(event);
206         struct job_broadcast *jb = malloc(sz + sizeof *jb);
207         if (jb) {
208                 jb->object = object;
209                 memcpy(jb->uuid, uuid, sizeof jb->uuid);
210                 jb->hop = hop;
211                 memcpy(jb->event, event, sz);
212         }
213         return jb;
214 }
215
216 /*
217  * Destroy structure 'jb' for job of broadcasting string events
218  */
219 static void destroy_job_broadcast(struct job_broadcast *jb)
220 {
221         json_object_put(jb->object);
222         free(jb);
223 }
224
225 /*
226  * Create structure for job of broadcasting or pushing 'evtid' with 'object'
227  * Returns the created structure or NULL if out of memory
228  */
229 static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
230 {
231         struct job_evtid *je = malloc(sizeof *je);
232         if (je) {
233                 je->evtid = afb_evt_evtid_addref(evtid);
234                 je->object = object;
235         }
236         return je;
237 }
238
239 /*
240  * Destroy structure for job of broadcasting or pushing evtid
241  */
242 static void destroy_job_evtid(struct job_evtid *je)
243 {
244         afb_evt_evtid_unref(je->evtid);
245         json_object_put(je->object);
246         free(je);
247 }
248
249 /*
250  * Broadcasts the 'event' of 'id' with its 'object'
251  */
252 static void broadcast(struct job_broadcast *jb)
253 {
254         struct afb_evt_listener *listener;
255
256         pthread_rwlock_rdlock(&listeners_rwlock);
257         listener = listeners;
258         while(listener) {
259                 if (listener->itf->broadcast != NULL)
260                         listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop);
261                 listener = listener->next;
262         }
263         pthread_rwlock_unlock(&listeners_rwlock);
264 }
265
266 /*
267  * Jobs callback for broadcasting string asynchronously
268  */
269 static void broadcast_job(int signum, void *closure)
270 {
271         struct job_broadcast *jb = closure;
272
273         if (signum == 0)
274                 broadcast(jb);
275         destroy_job_broadcast(jb);
276 }
277
278 /*
279  * Broadcasts the string 'event' with its 'object'
280  */
281 static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
282 {
283         uuid_binary_t local_uuid;
284         struct job_broadcast *jb;
285         int rc;
286 #if EVENT_BROADCAST_MEMORY_COUNT
287         int iter, count;
288 #endif
289
290         /* check if lately sent */
291         if (!uuid) {
292                 uuid_new_binary(local_uuid);
293                 uuid = local_uuid;
294                 hop = EVENT_BROADCAST_HOP_MAX;
295 #if EVENT_BROADCAST_MEMORY_COUNT
296                 pthread_mutex_lock(&uniqueness.mutex);
297         } else {
298                 pthread_mutex_lock(&uniqueness.mutex);
299                 iter = (int)uniqueness.base;
300                 count = (int)uniqueness.count;
301                 while (count) {
302                         if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) {
303                                 pthread_mutex_unlock(&uniqueness.mutex);
304                                 return 0;
305                         }
306                         if (++iter == EVENT_BROADCAST_MEMORY_COUNT)
307                                 iter = 0;
308                         count--;
309                 }
310         }
311         iter = (int)uniqueness.base;
312         if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT)
313                 iter += (int)(uniqueness.count++);
314         else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT)
315                 uniqueness.base = 0;
316         memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t));
317         pthread_mutex_unlock(&uniqueness.mutex);
318 #else
319         }
320 #endif
321
322         /* create the structure for the job */
323         jb = make_job_broadcast(event, object, uuid, hop);
324         if (jb == NULL) {
325                 ERROR("Cant't create broadcast string job item for %s(%s)",
326                         event, json_object_to_json_string(object));
327                 json_object_put(object);
328                 return -1;
329         }
330
331         /* queue the job */
332         rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
333         if (rc) {
334                 ERROR("cant't queue broadcast string job item for %s(%s)",
335                         event, json_object_to_json_string(object));
336                 destroy_job_broadcast(jb);
337         }
338         return rc;
339 }
340
341 /*
342  * Broadcasts the event 'evtid' with its 'object'
343  * 'object' is released (like json_object_put)
344  * Returns the count of listener that received the event.
345  */
346 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
347 {
348         return unhooked_broadcast(evtid->fullname, object, NULL, 0);
349 }
350
351 /*
352  * Broadcasts the event 'evtid' with its 'object'
353  * 'object' is released (like json_object_put)
354  * Returns the count of listener that received the event.
355  */
356 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
357 {
358         int result;
359
360         json_object_get(object);
361
362         if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
363                 afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
364
365         result = afb_evt_evtid_broadcast(evtid, object);
366
367         if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
368                 afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
369
370         json_object_put(object);
371
372         return result;
373 }
374
375 int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
376 {
377         int result;
378
379 #if WITH_AFB_HOOK
380         json_object_get(object);
381         afb_hook_evt_broadcast_before(event, 0, object);
382 #endif
383
384         result = unhooked_broadcast(event, object, uuid, hop);
385
386 #if WITH_AFB_HOOK
387         afb_hook_evt_broadcast_after(event, 0, object, result);
388         json_object_put(object);
389 #endif
390         return result;
391 }
392
393 /*
394  * Broadcasts the 'event' with its 'object'
395  * 'object' is released (like json_object_put)
396  * Returns the count of listener having receive the event.
397  */
398 int afb_evt_broadcast(const char *event, struct json_object *object)
399 {
400         return afb_evt_rebroadcast(event, object, NULL, 0);
401 }
402
403 /*
404  * Pushes the event 'evtid' with 'obj' to its listeners
405  * Returns the count of listener that received the event.
406  */
407 static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
408 {
409         int has_client;
410         struct afb_evt_watch *watch;
411         struct afb_evt_listener *listener;
412
413         has_client = 0;
414         pthread_rwlock_rdlock(&evtid->rwlock);
415         watch = evtid->watchs;
416         while(watch) {
417                 listener = watch->listener;
418                 assert(listener->itf->push != NULL);
419                 if (watch->activity != 0) {
420                         listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
421                         has_client = 1;
422                 }
423                 watch = watch->next_by_evtid;
424         }
425         evtid->has_client = has_client;
426         pthread_rwlock_unlock(&evtid->rwlock);
427 }
428
429 /*
430  * Jobs callback for pushing evtid asynchronously
431  */
432 static void push_job_evtid(int signum, void *closure)
433 {
434         struct job_evtid *je = closure;
435
436         if (signum == 0)
437                 push_evtid(je->evtid, je->object);
438         destroy_job_evtid(je);
439 }
440
441 /*
442  * Pushes the event 'evtid' with 'obj' to its listeners
443  * 'obj' is released (like json_object_put)
444  * Returns 1 if at least one listener exists or 0 if no listener exists or
445  * -1 in case of error and the event can't be delivered
446  */
447 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
448 {
449         struct job_evtid *je;
450         int rc;
451
452         je = make_job_evtid(evtid, object);
453         if (je == NULL) {
454                 ERROR("Cant't create push evtid job item for %s(%s)",
455                         evtid->fullname, json_object_to_json_string(object));
456                 json_object_put(object);
457                 return -1;
458         }
459
460         rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
461         if (rc == 0)
462                 rc = evtid->has_client;
463         else {
464                 ERROR("cant't queue push evtid job item for %s(%s)",
465                         evtid->fullname, json_object_to_json_string(object));
466                 destroy_job_evtid(je);
467         }
468
469         return rc;
470 }
471
472 /*
473  * Pushes the event 'evtid' with 'obj' to its listeners
474  * 'obj' is released (like json_object_put)
475  * Emits calls to hooks.
476  * Returns the count of listener taht received the event.
477  */
478 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
479 {
480
481         int result;
482
483         /* lease the object */
484         json_object_get(obj);
485
486         /* hook before push */
487         if (evtid->hookflags & afb_hook_flag_evt_push_before)
488                 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
489
490         /* push */
491         result = afb_evt_evtid_push(evtid, obj);
492
493         /* hook after push */
494         if (evtid->hookflags & afb_hook_flag_evt_push_after)
495                 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
496
497         /* release the object */
498         json_object_put(obj);
499         return result;
500 }
501
502 /*
503  * remove the 'watch'
504  */
505 static void remove_watch(struct afb_evt_watch *watch)
506 {
507         struct afb_evt_watch **prv;
508         struct afb_evtid *evtid;
509         struct afb_evt_listener *listener;
510
511         /* notify listener if needed */
512         evtid = watch->evtid;
513         listener = watch->listener;
514         if (watch->activity != 0 && listener->itf->remove != NULL)
515                 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
516
517         /* unlink the watch for its event */
518         prv = &evtid->watchs;
519         while(*prv != watch)
520                 prv = &(*prv)->next_by_evtid;
521         *prv = watch->next_by_evtid;
522
523         /* unlink the watch for its listener */
524         prv = &listener->watchs;
525         while(*prv != watch)
526                 prv = &(*prv)->next_by_listener;
527         *prv = watch->next_by_listener;
528
529         /* recycle memory */
530         free(watch);
531 }
532
533 /*
534  * Creates an event of name 'fullname' and returns it or NULL on error.
535  */
536 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
537 {
538         size_t len;
539         struct afb_evtid *evtid, *oevt;
540
541         /* allocates the event */
542         len = strlen(fullname);
543         evtid = malloc(len + 1 + sizeof * evtid);
544         if (evtid == NULL)
545                 goto error;
546
547         /* allocates the id */
548         pthread_rwlock_wrlock(&events_rwlock);
549         do {
550                 if (++event_id_counter < 0) {
551                         event_id_wrapped = 1;
552                         event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
553                 }
554                 if (!event_id_wrapped)
555                         break;
556                 oevt = evtids;
557                 while(oevt != NULL && oevt->id != event_id_counter)
558                         oevt = oevt->next;
559         } while (oevt != NULL);
560
561         /* initialize the event */
562         memcpy(evtid->fullname, fullname, len + 1);
563         evtid->next = evtids;
564         evtid->refcount = 1;
565         evtid->watchs = NULL;
566         evtid->id = event_id_counter;
567         evtid->has_client = 0;
568         pthread_rwlock_init(&evtid->rwlock, NULL);
569         evtids = evtid;
570         evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
571         evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_event_x2_itf;
572         if (evtid->hookflags & afb_hook_flag_evt_create)
573                 afb_hook_evt_create(evtid->fullname, evtid->id);
574         pthread_rwlock_unlock(&events_rwlock);
575
576         /* returns the event */
577         return evtid;
578 error:
579         return NULL;
580 }
581
582 /*
583  * Creates an event of name 'prefix'/'name' and returns it or NULL on error.
584  */
585 struct afb_evtid *afb_evt_evtid_create2(const char *prefix, const char *name)
586 {
587         size_t prelen, postlen;
588         char *fullname;
589
590         /* makes the event fullname */
591         prelen = strlen(prefix);
592         postlen = strlen(name);
593         fullname = alloca(prelen + postlen + 2);
594         memcpy(fullname, prefix, prelen);
595         fullname[prelen] = '/';
596         memcpy(fullname + prelen + 1, name, postlen + 1);
597
598         /* create the event */
599         return afb_evt_evtid_create(fullname);
600 }
601
602 /*
603  * increment the reference count of the event 'evtid'
604  */
605 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
606 {
607         __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
608         return evtid;
609 }
610
611 /*
612  * increment the reference count of the event 'evtid'
613  */
614 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
615 {
616         if (evtid->hookflags & afb_hook_flag_evt_addref)
617                 afb_hook_evt_addref(evtid->fullname, evtid->id);
618         return afb_evt_evtid_addref(evtid);
619 }
620
621 /*
622  * decrement the reference count of the event 'evtid'
623  * and destroy it when the count reachs zero
624  */
625 void afb_evt_evtid_unref(struct afb_evtid *evtid)
626 {
627         int found;
628         struct afb_evtid **prv;
629         struct afb_evt_listener *listener;
630
631         if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
632                 /* unlinks the event if valid! */
633                 pthread_rwlock_wrlock(&events_rwlock);
634                 found = 0;
635                 prv = &evtids;
636                 while (*prv && !(found = (*prv == evtid)))
637                         prv = &(*prv)->next;
638                 if (found)
639                         *prv = evtid->next;
640                 pthread_rwlock_unlock(&events_rwlock);
641
642                 /* destroys the event */
643                 if (!found)
644                         ERROR("event not found");
645                 else {
646                         /* removes all watchers */
647                         while(evtid->watchs != NULL) {
648                                 listener = evtid->watchs->listener;
649                                 pthread_rwlock_wrlock(&listener->rwlock);
650                                 pthread_rwlock_wrlock(&evtid->rwlock);
651                                 remove_watch(evtid->watchs);
652                                 pthread_rwlock_unlock(&evtid->rwlock);
653                                 pthread_rwlock_unlock(&listener->rwlock);
654                         }
655
656                         /* free */
657                         pthread_rwlock_destroy(&evtid->rwlock);
658                         free(evtid);
659                 }
660         }
661 }
662
663 /*
664  * decrement the reference count of the event 'evtid'
665  * and destroy it when the count reachs zero
666  */
667 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
668 {
669         if (evtid->hookflags & afb_hook_flag_evt_unref)
670                 afb_hook_evt_unref(evtid->fullname, evtid->id);
671         afb_evt_evtid_unref(evtid);
672 }
673
674 /*
675  * Returns the true name of the 'event'
676  */
677 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
678 {
679         return evtid->fullname;
680 }
681
682 /*
683  * Returns the name of the 'event'
684  */
685 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
686 {
687         const char *name = strchr(evtid->fullname, '/');
688         return name ? name + 1 : evtid->fullname;
689 }
690
691 /*
692  * Returns the name associated to the event 'evtid'.
693  */
694 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
695 {
696         const char *result = afb_evt_evtid_name(evtid);
697         if (evtid->hookflags & afb_hook_flag_evt_name)
698                 afb_hook_evt_name(evtid->fullname, evtid->id, result);
699         return result;
700 }
701
702 /*
703  * Returns the id of the 'event'
704  */
705 int afb_evt_evtid_id(struct afb_evtid *evtid)
706 {
707         return evtid->id;
708 }
709
710 /*
711  * Returns an instance of the listener defined by the 'send' callback
712  * and the 'closure'.
713  * Returns NULL in case of memory depletion.
714  */
715 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
716 {
717         struct afb_evt_listener *listener;
718
719         /* search if an instance already exists */
720         pthread_rwlock_wrlock(&listeners_rwlock);
721         listener = listeners;
722         while (listener != NULL) {
723                 if (listener->itf == itf && listener->closure == closure) {
724                         listener = afb_evt_listener_addref(listener);
725                         goto found;
726                 }
727                 listener = listener->next;
728         }
729
730         /* allocates */
731         listener = calloc(1, sizeof *listener);
732         if (listener != NULL) {
733                 /* init */
734                 listener->itf = itf;
735                 listener->closure = closure;
736                 listener->watchs = NULL;
737                 listener->refcount = 1;
738                 pthread_rwlock_init(&listener->rwlock, NULL);
739                 listener->next = listeners;
740                 listeners = listener;
741         }
742  found:
743         pthread_rwlock_unlock(&listeners_rwlock);
744         return listener;
745 }
746
747 /*
748  * Increases the reference count of 'listener' and returns it
749  */
750 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
751 {
752         __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
753         return listener;
754 }
755
756 /*
757  * Decreases the reference count of the 'listener' and destroys it
758  * when no more used.
759  */
760 void afb_evt_listener_unref(struct afb_evt_listener *listener)
761 {
762         struct afb_evt_listener **prv;
763         struct afb_evtid *evtid;
764
765         if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
766
767                 /* unlink the listener */
768                 pthread_rwlock_wrlock(&listeners_rwlock);
769                 prv = &listeners;
770                 while (*prv != listener)
771                         prv = &(*prv)->next;
772                 *prv = listener->next;
773                 pthread_rwlock_unlock(&listeners_rwlock);
774
775                 /* remove the watchers */
776                 pthread_rwlock_wrlock(&listener->rwlock);
777                 while (listener->watchs != NULL) {
778                         evtid = listener->watchs->evtid;
779                         pthread_rwlock_wrlock(&evtid->rwlock);
780                         remove_watch(listener->watchs);
781                         pthread_rwlock_unlock(&evtid->rwlock);
782                 }
783                 pthread_rwlock_unlock(&listener->rwlock);
784
785                 /* free the listener */
786                 pthread_rwlock_destroy(&listener->rwlock);
787                 free(listener);
788         }
789 }
790
791 /*
792  * Makes the 'listener' watching 'evtid'
793  * Returns 0 in case of success or else -1.
794  */
795 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
796 {
797         struct afb_evt_watch *watch;
798
799         /* check parameter */
800         if (listener->itf->push == NULL) {
801                 errno = EINVAL;
802                 return -1;
803         }
804
805         /* search the existing watch for the listener */
806         pthread_rwlock_wrlock(&listener->rwlock);
807         watch = listener->watchs;
808         while(watch != NULL) {
809                 if (watch->evtid == evtid)
810                         goto found;
811                 watch = watch->next_by_listener;
812         }
813
814         /* not found, allocate a new */
815         watch = malloc(sizeof *watch);
816         if (watch == NULL) {
817                 pthread_rwlock_unlock(&listener->rwlock);
818                 errno = ENOMEM;
819                 return -1;
820         }
821
822         /* initialise and link */
823         watch->evtid = evtid;
824         watch->activity = 0;
825         watch->listener = listener;
826         watch->next_by_listener = listener->watchs;
827         listener->watchs = watch;
828         pthread_rwlock_wrlock(&evtid->rwlock);
829         watch->next_by_evtid = evtid->watchs;
830         evtid->watchs = watch;
831         pthread_rwlock_unlock(&evtid->rwlock);
832
833 found:
834         if (watch->activity == 0 && listener->itf->add != NULL)
835                 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
836         watch->activity++;
837         evtid->has_client = 1;
838         pthread_rwlock_unlock(&listener->rwlock);
839
840         return 0;
841 }
842
843 /*
844  * Avoids the 'listener' to watch 'evtid'
845  * Returns 0 in case of success or else -1.
846  */
847 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
848 {
849         struct afb_evt_watch *watch;
850
851         /* search the existing watch */
852         pthread_rwlock_wrlock(&listener->rwlock);
853         watch = listener->watchs;
854         while(watch != NULL) {
855                 if (watch->evtid == evtid) {
856                         if (watch->activity != 0) {
857                                 watch->activity--;
858                                 if (watch->activity == 0 && listener->itf->remove != NULL)
859                                         listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
860                         }
861                         pthread_rwlock_unlock(&listener->rwlock);
862                         return 0;
863                 }
864                 watch = watch->next_by_listener;
865         }
866         pthread_rwlock_unlock(&listener->rwlock);
867         errno = ENOENT;
868         return -1;
869 }
870
871 /*
872  * update the hooks for events
873  */
874 void afb_evt_update_hooks()
875 {
876         struct afb_evtid *evtid;
877
878         pthread_rwlock_rdlock(&events_rwlock);
879         for (evtid = evtids ; evtid ; evtid = evtid->next) {
880                 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
881                 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_event_x2_itf;
882         }
883         pthread_rwlock_unlock(&events_rwlock);
884 }
885
886 inline struct afb_evtid *afb_evt_event_x2_to_evtid(struct afb_event_x2 *eventid)
887 {
888         return (struct afb_evtid*)eventid;
889 }
890
891 inline struct afb_event_x2 *afb_evt_event_x2_from_evtid(struct afb_evtid *evtid)
892 {
893         return &evtid->eventid;
894 }
895
896 /*
897  * Creates an event of 'fullname' and returns it.
898  * Returns an event with closure==NULL in case of error.
899  */
900 struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname)
901 {
902         return afb_evt_event_x2_from_evtid(afb_evt_evtid_create(fullname));
903 }
904
905 /*
906  * Creates an event of name 'prefix'/'name' and returns it.
907  * Returns an event with closure==NULL in case of error.
908  */
909 struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name)
910 {
911         return afb_evt_event_x2_from_evtid(afb_evt_evtid_create2(prefix, name));
912 }
913
914 /*
915  * Returns the fullname of the 'eventid'
916  */
917 const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid)
918 {
919         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
920         return evtid ? evtid->fullname : NULL;
921 }
922
923 /*
924  * Returns the id of the 'eventid'
925  */
926 int afb_evt_event_x2_id(struct afb_event_x2 *eventid)
927 {
928         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
929         return evtid ? evtid->id : 0;
930 }
931
932 /*
933  * Makes the 'listener' watching 'eventid'
934  * Returns 0 in case of success or else -1.
935  */
936 int afb_evt_event_x2_add_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
937 {
938         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
939
940         /* check parameter */
941         if (!evtid) {
942                 errno = EINVAL;
943                 return -1;
944         }
945
946         /* search the existing watch for the listener */
947         return afb_evt_watch_add_evtid(listener, evtid);
948 }
949
950 /*
951  * Avoids the 'listener' to watch 'eventid'
952  * Returns 0 in case of success or else -1.
953  */
954 int afb_evt_event_x2_remove_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
955 {
956         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
957
958         /* check parameter */
959         if (!evtid) {
960                 errno = EINVAL;
961                 return -1;
962         }
963
964         /* search the existing watch */
965         return afb_evt_watch_sub_evtid(listener, evtid);
966 }
967
968 int afb_evt_event_x2_push(struct afb_event_x2 *eventid, struct json_object *object)
969 {
970         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
971         if (evtid)
972                 return afb_evt_evtid_hooked_push(evtid, object);
973         json_object_put(object);
974         return 0;
975 }
976
977 int afb_evt_event_x2_unhooked_push(struct afb_event_x2 *eventid, struct json_object *object)
978 {
979         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
980         if (evtid)
981                 return afb_evt_evtid_push(evtid, object);
982         json_object_put(object);
983         return 0;
984 }
985
986 struct afb_event_x1 afb_evt_event_from_evtid(struct afb_evtid *evtid)
987 {
988         return evtid
989                 ? (struct afb_event_x1){ .itf = &afb_evt_hooked_eventid_itf, .closure = &evtid->eventid }
990                 : (struct afb_event_x1){ .itf = NULL, .closure = NULL };
991 }
992
993 void afb_evt_event_x2_unref(struct afb_event_x2 *eventid)
994 {
995         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
996         if (evtid)
997                 afb_evt_evtid_unref(evtid);
998 }
999
1000 struct afb_event_x2 *afb_evt_event_x2_addref(struct afb_event_x2 *eventid)
1001 {
1002         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1003         if (evtid)
1004                 afb_evt_evtid_addref(evtid);
1005         return eventid;
1006 }
1007