afb-proto-ws: Add message for unexpected event
[src/app-framework-binder.git] / src / afb-evt.c
1 /*
2  * Copyright (C) 2015-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 #include <errno.h>
24 #include <pthread.h>
25
26 #include <json-c/json.h>
27 #include <afb/afb-event-x2-itf.h>
28 #include <afb/afb-event-x1.h>
29
30 #include "afb-evt.h"
31 #include "afb-hook.h"
32 #include "verbose.h"
33 #include "jobs.h"
34 #include "uuid.h"
35
36 struct afb_evt_watch;
37
38 /*
39  * Structure for event listeners
40  */
41 struct afb_evt_listener {
42
43         /* chaining listeners */
44         struct afb_evt_listener *next;
45
46         /* interface for callbacks */
47         const struct afb_evt_itf *itf;
48
49         /* closure for the callback */
50         void *closure;
51
52         /* head of the list of events listened */
53         struct afb_evt_watch *watchs;
54
55         /* rwlock of the listener */
56         pthread_rwlock_t rwlock;
57
58         /* count of reference to the listener */
59         uint16_t refcount;
60 };
61
62 /*
63  * Structure for describing events
64  */
65 struct afb_evtid {
66
67         /* interface */
68         struct afb_event_x2 eventid;
69
70         /* next event */
71         struct afb_evtid *next;
72
73         /* head of the list of listeners watching the event */
74         struct afb_evt_watch *watchs;
75
76         /* rwlock of the event */
77         pthread_rwlock_t rwlock;
78
79 #if WITH_AFB_HOOK
80         /* hooking */
81         int hookflags;
82 #endif
83
84         /* refcount */
85         uint16_t refcount;
86
87         /* id of the event */
88         uint16_t id;
89
90         /* has client? */
91         int has_client;
92
93         /* fullname of the event */
94         char fullname[];
95 };
96
97 /*
98  * Structure for associating events and listeners
99  */
100 struct afb_evt_watch {
101
102         /* the evtid */
103         struct afb_evtid *evtid;
104
105         /* link to the next watcher for the same evtid */
106         struct afb_evt_watch *next_by_evtid;
107
108         /* the listener */
109         struct afb_evt_listener *listener;
110
111         /* link to the next watcher for the same listener */
112         struct afb_evt_watch *next_by_listener;
113
114         /* activity */
115         unsigned activity;
116 };
117
118 /*
119  * structure for job of broadcasting events
120  */
121 struct job_broadcast
122 {
123         /** object atached to the event */
124         struct json_object *object;
125
126         /** the uuid of the event */
127         uuid_binary_t  uuid;
128
129         /** remaining hop */
130         uint8_t hop;
131
132         /** name of the event to broadcast */
133         char event[];
134 };
135
136 /*
137  * structure for job of broadcasting or pushing events
138  */
139 struct job_evtid
140 {
141         /** the event to broadcast */
142         struct afb_evtid *evtid;
143
144         /** object atached to the event */
145         struct json_object *object;
146 };
147
148 /* the interface for events */
149 static struct afb_event_x2_itf afb_evt_event_x2_itf = {
150         .broadcast = (void*)afb_evt_evtid_broadcast,
151         .push = (void*)afb_evt_evtid_push,
152         .unref = (void*)afb_evt_evtid_unref,
153         .name = (void*)afb_evt_evtid_name,
154         .addref = (void*)afb_evt_evtid_addref
155 };
156
157 #if WITH_AFB_HOOK
158 /* the interface for events */
159 static struct afb_event_x2_itf afb_evt_hooked_event_x2_itf = {
160         .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
161         .push = (void*)afb_evt_evtid_hooked_push,
162         .unref = (void*)afb_evt_evtid_hooked_unref,
163         .name = (void*)afb_evt_evtid_hooked_name,
164         .addref = (void*)afb_evt_evtid_hooked_addref
165 };
166 #endif
167
168 /* job groups for events push/broadcast */
169 #define BROADCAST_JOB_GROUP  (&afb_evt_event_x2_itf)
170 #define PUSH_JOB_GROUP       (&afb_evt_event_x2_itf)
171
172 /* head of the list of listeners */
173 static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
174 static struct afb_evt_listener *listeners = NULL;
175
176 /* handling id of events */
177 static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER;
178 static struct afb_evtid *evtids = NULL;
179 static uint16_t event_genid = 0;
180 static uint16_t event_count = 0;
181
182 /* head of uniqueness of events */
183 #if !defined(EVENT_BROADCAST_HOP_MAX)
184 #  define EVENT_BROADCAST_HOP_MAX  10
185 #endif
186 #if !defined(EVENT_BROADCAST_MEMORY_COUNT)
187 #  define EVENT_BROADCAST_MEMORY_COUNT  8
188 #endif
189
190 #if EVENT_BROADCAST_MEMORY_COUNT
191 static struct {
192         pthread_mutex_t mutex;
193         uint8_t base;
194         uint8_t count;
195         uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT];
196 } uniqueness = {
197         .mutex = PTHREAD_MUTEX_INITIALIZER,
198         .base = 0,
199         .count = 0
200 };
201 #endif
202
203 /*
204  * Create structure for job of broadcasting string 'event' with 'object'
205  * Returns the created structure or NULL if out of memory
206  */
207 static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
208 {
209         size_t sz = 1 + strlen(event);
210         struct job_broadcast *jb = malloc(sz + sizeof *jb);
211         if (jb) {
212                 jb->object = object;
213                 memcpy(jb->uuid, uuid, sizeof jb->uuid);
214                 jb->hop = hop;
215                 memcpy(jb->event, event, sz);
216         }
217         return jb;
218 }
219
220 /*
221  * Destroy structure 'jb' for job of broadcasting string events
222  */
223 static void destroy_job_broadcast(struct job_broadcast *jb)
224 {
225         json_object_put(jb->object);
226         free(jb);
227 }
228
229 /*
230  * Create structure for job of broadcasting or pushing 'evtid' with 'object'
231  * Returns the created structure or NULL if out of memory
232  */
233 static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
234 {
235         struct job_evtid *je = malloc(sizeof *je);
236         if (je) {
237                 je->evtid = afb_evt_evtid_addref(evtid);
238                 je->object = object;
239         }
240         return je;
241 }
242
243 /*
244  * Destroy structure for job of broadcasting or pushing evtid
245  */
246 static void destroy_job_evtid(struct job_evtid *je)
247 {
248         afb_evt_evtid_unref(je->evtid);
249         json_object_put(je->object);
250         free(je);
251 }
252
253 /*
254  * Broadcasts the 'event' of 'id' with its 'object'
255  */
256 static void broadcast(struct job_broadcast *jb)
257 {
258         struct afb_evt_listener *listener;
259
260         pthread_rwlock_rdlock(&listeners_rwlock);
261         listener = listeners;
262         while(listener) {
263                 if (listener->itf->broadcast != NULL)
264                         listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop);
265                 listener = listener->next;
266         }
267         pthread_rwlock_unlock(&listeners_rwlock);
268 }
269
270 /*
271  * Jobs callback for broadcasting string asynchronously
272  */
273 static void broadcast_job(int signum, void *closure)
274 {
275         struct job_broadcast *jb = closure;
276
277         if (signum == 0)
278                 broadcast(jb);
279         destroy_job_broadcast(jb);
280 }
281
282 /*
283  * Broadcasts the string 'event' with its 'object'
284  */
285 static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
286 {
287         uuid_binary_t local_uuid;
288         struct job_broadcast *jb;
289         int rc;
290 #if EVENT_BROADCAST_MEMORY_COUNT
291         int iter, count;
292 #endif
293
294         /* check if lately sent */
295         if (!uuid) {
296                 uuid_new_binary(local_uuid);
297                 uuid = local_uuid;
298                 hop = EVENT_BROADCAST_HOP_MAX;
299 #if EVENT_BROADCAST_MEMORY_COUNT
300                 pthread_mutex_lock(&uniqueness.mutex);
301         } else {
302                 pthread_mutex_lock(&uniqueness.mutex);
303                 iter = (int)uniqueness.base;
304                 count = (int)uniqueness.count;
305                 while (count) {
306                         if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) {
307                                 pthread_mutex_unlock(&uniqueness.mutex);
308                                 return 0;
309                         }
310                         if (++iter == EVENT_BROADCAST_MEMORY_COUNT)
311                                 iter = 0;
312                         count--;
313                 }
314         }
315         iter = (int)uniqueness.base;
316         if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT)
317                 iter += (int)(uniqueness.count++);
318         else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT)
319                 uniqueness.base = 0;
320         memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t));
321         pthread_mutex_unlock(&uniqueness.mutex);
322 #else
323         }
324 #endif
325
326         /* create the structure for the job */
327         jb = make_job_broadcast(event, object, uuid, hop);
328         if (jb == NULL) {
329                 ERROR("Cant't create broadcast string job item for %s(%s)",
330                         event, json_object_to_json_string(object));
331                 json_object_put(object);
332                 return -1;
333         }
334
335         /* queue the job */
336         rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
337         if (rc) {
338                 ERROR("cant't queue broadcast string job item for %s(%s)",
339                         event, json_object_to_json_string(object));
340                 destroy_job_broadcast(jb);
341         }
342         return rc;
343 }
344
345 /*
346  * Broadcasts the event 'evtid' with its 'object'
347  * 'object' is released (like json_object_put)
348  * Returns the count of listener that received the event.
349  */
350 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
351 {
352         return unhooked_broadcast(evtid->fullname, object, NULL, 0);
353 }
354
355 #if WITH_AFB_HOOK
356 /*
357  * Broadcasts the event 'evtid' with its 'object'
358  * 'object' is released (like json_object_put)
359  * Returns the count of listener that received the event.
360  */
361 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
362 {
363         int result;
364
365         json_object_get(object);
366
367         if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
368                 afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
369
370         result = afb_evt_evtid_broadcast(evtid, object);
371
372         if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
373                 afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
374
375         json_object_put(object);
376
377         return result;
378 }
379 #endif
380
381 int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
382 {
383         int result;
384
385 #if WITH_AFB_HOOK
386         json_object_get(object);
387         afb_hook_evt_broadcast_before(event, 0, object);
388 #endif
389
390         result = unhooked_broadcast(event, object, uuid, hop);
391
392 #if WITH_AFB_HOOK
393         afb_hook_evt_broadcast_after(event, 0, object, result);
394         json_object_put(object);
395 #endif
396         return result;
397 }
398
399 /*
400  * Broadcasts the 'event' with its 'object'
401  * 'object' is released (like json_object_put)
402  * Returns the count of listener having receive the event.
403  */
404 int afb_evt_broadcast(const char *event, struct json_object *object)
405 {
406         return afb_evt_rebroadcast(event, object, NULL, 0);
407 }
408
409 /*
410  * Pushes the event 'evtid' with 'obj' to its listeners
411  * Returns the count of listener that received the event.
412  */
413 static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
414 {
415         int has_client;
416         struct afb_evt_watch *watch;
417         struct afb_evt_listener *listener;
418
419         has_client = 0;
420         pthread_rwlock_rdlock(&evtid->rwlock);
421         watch = evtid->watchs;
422         while(watch) {
423                 listener = watch->listener;
424                 assert(listener->itf->push != NULL);
425                 if (watch->activity != 0) {
426                         listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
427                         has_client = 1;
428                 }
429                 watch = watch->next_by_evtid;
430         }
431         evtid->has_client = has_client;
432         pthread_rwlock_unlock(&evtid->rwlock);
433 }
434
435 /*
436  * Jobs callback for pushing evtid asynchronously
437  */
438 static void push_job_evtid(int signum, void *closure)
439 {
440         struct job_evtid *je = closure;
441
442         if (signum == 0)
443                 push_evtid(je->evtid, je->object);
444         destroy_job_evtid(je);
445 }
446
447 /*
448  * Pushes the event 'evtid' with 'obj' to its listeners
449  * 'obj' is released (like json_object_put)
450  * Returns 1 if at least one listener exists or 0 if no listener exists or
451  * -1 in case of error and the event can't be delivered
452  */
453 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
454 {
455         struct job_evtid *je;
456         int rc;
457
458         je = make_job_evtid(evtid, object);
459         if (je == NULL) {
460                 ERROR("Cant't create push evtid job item for %s(%s)",
461                         evtid->fullname, json_object_to_json_string(object));
462                 json_object_put(object);
463                 return -1;
464         }
465
466         rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
467         if (rc == 0)
468                 rc = evtid->has_client;
469         else {
470                 ERROR("cant't queue push evtid job item for %s(%s)",
471                         evtid->fullname, json_object_to_json_string(object));
472                 destroy_job_evtid(je);
473         }
474
475         return rc;
476 }
477
478 #if WITH_AFB_HOOK
479 /*
480  * Pushes the event 'evtid' with 'obj' to its listeners
481  * 'obj' is released (like json_object_put)
482  * Emits calls to hooks.
483  * Returns the count of listener taht received the event.
484  */
485 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
486 {
487
488         int result;
489
490         /* lease the object */
491         json_object_get(obj);
492
493         /* hook before push */
494         if (evtid->hookflags & afb_hook_flag_evt_push_before)
495                 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
496
497         /* push */
498         result = afb_evt_evtid_push(evtid, obj);
499
500         /* hook after push */
501         if (evtid->hookflags & afb_hook_flag_evt_push_after)
502                 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
503
504         /* release the object */
505         json_object_put(obj);
506         return result;
507 }
508 #endif
509
510 /*
511  * remove the 'watch'
512  */
513 static void remove_watch(struct afb_evt_watch *watch)
514 {
515         struct afb_evt_watch **prv;
516         struct afb_evtid *evtid;
517         struct afb_evt_listener *listener;
518
519         /* notify listener if needed */
520         evtid = watch->evtid;
521         listener = watch->listener;
522         if (watch->activity != 0 && listener->itf->remove != NULL)
523                 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
524
525         /* unlink the watch for its event */
526         prv = &evtid->watchs;
527         while(*prv != watch)
528                 prv = &(*prv)->next_by_evtid;
529         *prv = watch->next_by_evtid;
530
531         /* unlink the watch for its listener */
532         prv = &listener->watchs;
533         while(*prv != watch)
534                 prv = &(*prv)->next_by_listener;
535         *prv = watch->next_by_listener;
536
537         /* recycle memory */
538         free(watch);
539 }
540
541 /*
542  * Creates an event of name 'fullname' and returns it or NULL on error.
543  */
544 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
545 {
546         size_t len;
547         struct afb_evtid *evtid, *oevt;
548         uint16_t id;
549
550         /* allocates the event */
551         len = strlen(fullname);
552         evtid = malloc(len + 1 + sizeof * evtid);
553         if (evtid == NULL)
554                 goto error;
555
556         /* allocates the id */
557         pthread_rwlock_wrlock(&events_rwlock);
558         if (event_count == UINT16_MAX) {
559                 pthread_rwlock_unlock(&events_rwlock);
560                 free(evtid);
561                 ERROR("Can't create more events");
562                 return NULL;
563         }
564         event_count++;
565         do {
566                 /* TODO add a guard (counting number of event created) */
567                 id = ++event_genid;
568                 if (!id)
569                         id = event_genid = 1;
570                 oevt = evtids;
571                 while(oevt != NULL && oevt->id != id)
572                         oevt = oevt->next;
573         } while (oevt != NULL);
574
575         /* initialize the event */
576         memcpy(evtid->fullname, fullname, len + 1);
577         evtid->next = evtids;
578         evtid->refcount = 1;
579         evtid->watchs = NULL;
580         evtid->id = id;
581         evtid->has_client = 0;
582         pthread_rwlock_init(&evtid->rwlock, NULL);
583         evtids = evtid;
584 #if WITH_AFB_HOOK
585         evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
586         evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf;
587         if (evtid->hookflags & afb_hook_flag_evt_create)
588                 afb_hook_evt_create(evtid->fullname, evtid->id);
589 #else
590         evtid->eventid.itf = &afb_evt_event_x2_itf;
591 #endif
592         pthread_rwlock_unlock(&events_rwlock);
593
594         /* returns the event */
595         return evtid;
596 error:
597         return NULL;
598 }
599
600 /*
601  * Creates an event of name 'prefix'/'name' and returns it or NULL on error.
602  */
603 struct afb_evtid *afb_evt_evtid_create2(const char *prefix, const char *name)
604 {
605         size_t prelen, postlen;
606         char *fullname;
607
608         /* makes the event fullname */
609         prelen = strlen(prefix);
610         postlen = strlen(name);
611         fullname = alloca(prelen + postlen + 2);
612         memcpy(fullname, prefix, prelen);
613         fullname[prelen] = '/';
614         memcpy(fullname + prelen + 1, name, postlen + 1);
615
616         /* create the event */
617         return afb_evt_evtid_create(fullname);
618 }
619
620 /*
621  * increment the reference count of the event 'evtid'
622  */
623 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
624 {
625         __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
626         return evtid;
627 }
628
629 #if WITH_AFB_HOOK
630 /*
631  * increment the reference count of the event 'evtid'
632  */
633 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
634 {
635         if (evtid->hookflags & afb_hook_flag_evt_addref)
636                 afb_hook_evt_addref(evtid->fullname, evtid->id);
637         return afb_evt_evtid_addref(evtid);
638 }
639 #endif
640
641 /*
642  * decrement the reference count of the event 'evtid'
643  * and destroy it when the count reachs zero
644  */
645 void afb_evt_evtid_unref(struct afb_evtid *evtid)
646 {
647         int found;
648         struct afb_evtid **prv;
649         struct afb_evt_listener *listener;
650
651         if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
652                 /* unlinks the event if valid! */
653                 pthread_rwlock_wrlock(&events_rwlock);
654                 found = 0;
655                 prv = &evtids;
656                 while (*prv && !(found = (*prv == evtid)))
657                         prv = &(*prv)->next;
658                 if (found) {
659                         *prv = evtid->next;
660                         event_count--;
661                 }
662                 pthread_rwlock_unlock(&events_rwlock);
663
664                 /* destroys the event */
665                 if (!found)
666                         ERROR("event not found");
667                 else {
668                         /* removes all watchers */
669                         while(evtid->watchs != NULL) {
670                                 listener = evtid->watchs->listener;
671                                 pthread_rwlock_wrlock(&listener->rwlock);
672                                 pthread_rwlock_wrlock(&evtid->rwlock);
673                                 remove_watch(evtid->watchs);
674                                 pthread_rwlock_unlock(&evtid->rwlock);
675                                 pthread_rwlock_unlock(&listener->rwlock);
676                         }
677
678                         /* free */
679                         pthread_rwlock_destroy(&evtid->rwlock);
680                         free(evtid);
681                 }
682         }
683 }
684
685 #if WITH_AFB_HOOK
686 /*
687  * decrement the reference count of the event 'evtid'
688  * and destroy it when the count reachs zero
689  */
690 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
691 {
692         if (evtid->hookflags & afb_hook_flag_evt_unref)
693                 afb_hook_evt_unref(evtid->fullname, evtid->id);
694         afb_evt_evtid_unref(evtid);
695 }
696 #endif
697
698 /*
699  * Returns the true name of the 'event'
700  */
701 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
702 {
703         return evtid->fullname;
704 }
705
706 /*
707  * Returns the name of the 'event'
708  */
709 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
710 {
711         const char *name = strchr(evtid->fullname, '/');
712         return name ? name + 1 : evtid->fullname;
713 }
714
715 #if WITH_AFB_HOOK
716 /*
717  * Returns the name associated to the event 'evtid'.
718  */
719 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
720 {
721         const char *result = afb_evt_evtid_name(evtid);
722         if (evtid->hookflags & afb_hook_flag_evt_name)
723                 afb_hook_evt_name(evtid->fullname, evtid->id, result);
724         return result;
725 }
726 #endif
727
728 /*
729  * Returns the id of the 'event'
730  */
731 uint16_t afb_evt_evtid_id(struct afb_evtid *evtid)
732 {
733         return evtid->id;
734 }
735
736 /*
737  * Returns an instance of the listener defined by the 'send' callback
738  * and the 'closure'.
739  * Returns NULL in case of memory depletion.
740  */
741 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
742 {
743         struct afb_evt_listener *listener;
744
745         /* search if an instance already exists */
746         pthread_rwlock_wrlock(&listeners_rwlock);
747         listener = listeners;
748         while (listener != NULL) {
749                 if (listener->itf == itf && listener->closure == closure) {
750                         listener = afb_evt_listener_addref(listener);
751                         goto found;
752                 }
753                 listener = listener->next;
754         }
755
756         /* allocates */
757         listener = calloc(1, sizeof *listener);
758         if (listener != NULL) {
759                 /* init */
760                 listener->itf = itf;
761                 listener->closure = closure;
762                 listener->watchs = NULL;
763                 listener->refcount = 1;
764                 pthread_rwlock_init(&listener->rwlock, NULL);
765                 listener->next = listeners;
766                 listeners = listener;
767         }
768  found:
769         pthread_rwlock_unlock(&listeners_rwlock);
770         return listener;
771 }
772
773 /*
774  * Increases the reference count of 'listener' and returns it
775  */
776 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
777 {
778         __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
779         return listener;
780 }
781
782 /*
783  * Decreases the reference count of the 'listener' and destroys it
784  * when no more used.
785  */
786 void afb_evt_listener_unref(struct afb_evt_listener *listener)
787 {
788         struct afb_evt_listener **prv;
789         struct afb_evtid *evtid;
790
791         if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
792
793                 /* unlink the listener */
794                 pthread_rwlock_wrlock(&listeners_rwlock);
795                 prv = &listeners;
796                 while (*prv != listener)
797                         prv = &(*prv)->next;
798                 *prv = listener->next;
799                 pthread_rwlock_unlock(&listeners_rwlock);
800
801                 /* remove the watchers */
802                 pthread_rwlock_wrlock(&listener->rwlock);
803                 while (listener->watchs != NULL) {
804                         evtid = listener->watchs->evtid;
805                         pthread_rwlock_wrlock(&evtid->rwlock);
806                         remove_watch(listener->watchs);
807                         pthread_rwlock_unlock(&evtid->rwlock);
808                 }
809                 pthread_rwlock_unlock(&listener->rwlock);
810
811                 /* free the listener */
812                 pthread_rwlock_destroy(&listener->rwlock);
813                 free(listener);
814         }
815 }
816
817 /*
818  * Makes the 'listener' watching 'evtid'
819  * Returns 0 in case of success or else -1.
820  */
821 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
822 {
823         struct afb_evt_watch *watch;
824
825         /* check parameter */
826         if (listener->itf->push == NULL) {
827                 errno = EINVAL;
828                 return -1;
829         }
830
831         /* search the existing watch for the listener */
832         pthread_rwlock_wrlock(&listener->rwlock);
833         watch = listener->watchs;
834         while(watch != NULL) {
835                 if (watch->evtid == evtid)
836                         goto found;
837                 watch = watch->next_by_listener;
838         }
839
840         /* not found, allocate a new */
841         watch = malloc(sizeof *watch);
842         if (watch == NULL) {
843                 pthread_rwlock_unlock(&listener->rwlock);
844                 errno = ENOMEM;
845                 return -1;
846         }
847
848         /* initialise and link */
849         watch->evtid = evtid;
850         watch->activity = 0;
851         watch->listener = listener;
852         watch->next_by_listener = listener->watchs;
853         listener->watchs = watch;
854         pthread_rwlock_wrlock(&evtid->rwlock);
855         watch->next_by_evtid = evtid->watchs;
856         evtid->watchs = watch;
857         pthread_rwlock_unlock(&evtid->rwlock);
858
859 found:
860         if (watch->activity == 0 && listener->itf->add != NULL)
861                 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
862         watch->activity++;
863         evtid->has_client = 1;
864         pthread_rwlock_unlock(&listener->rwlock);
865
866         return 0;
867 }
868
869 /*
870  * Avoids the 'listener' to watch 'evtid'
871  * Returns 0 in case of success or else -1.
872  */
873 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
874 {
875         struct afb_evt_watch *watch;
876
877         /* search the existing watch */
878         pthread_rwlock_wrlock(&listener->rwlock);
879         watch = listener->watchs;
880         while(watch != NULL) {
881                 if (watch->evtid == evtid) {
882                         if (watch->activity != 0) {
883                                 watch->activity--;
884                                 if (watch->activity == 0 && listener->itf->remove != NULL)
885                                         listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
886                         }
887                         pthread_rwlock_unlock(&listener->rwlock);
888                         return 0;
889                 }
890                 watch = watch->next_by_listener;
891         }
892         pthread_rwlock_unlock(&listener->rwlock);
893         errno = ENOENT;
894         return -1;
895 }
896
897 /*
898  * Avoids the 'listener' to watch 'eventid'
899  * Returns 0 in case of success or else -1.
900  */
901 int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid)
902 {
903         struct afb_evt_watch *watch;
904         struct afb_evtid *evtid;
905
906         /* search the existing watch */
907         pthread_rwlock_wrlock(&listener->rwlock);
908         watch = listener->watchs;
909         while(watch != NULL) {
910                 evtid = watch->evtid;
911                 if (evtid->id == eventid) {
912                         if (watch->activity != 0) {
913                                 watch->activity--;
914                                 if (watch->activity == 0 && listener->itf->remove != NULL)
915                                         listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
916                         }
917                         pthread_rwlock_unlock(&listener->rwlock);
918                         return 0;
919                 }
920                 watch = watch->next_by_listener;
921         }
922         pthread_rwlock_unlock(&listener->rwlock);
923         errno = ENOENT;
924         return -1;
925 }
926
927 #if WITH_AFB_HOOK
928 /*
929  * update the hooks for events
930  */
931 void afb_evt_update_hooks()
932 {
933         struct afb_evtid *evtid;
934
935         pthread_rwlock_rdlock(&events_rwlock);
936         for (evtid = evtids ; evtid ; evtid = evtid->next) {
937                 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
938                 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf;
939         }
940         pthread_rwlock_unlock(&events_rwlock);
941 }
942 #endif
943
944 inline struct afb_evtid *afb_evt_event_x2_to_evtid(struct afb_event_x2 *eventid)
945 {
946         return (struct afb_evtid*)eventid;
947 }
948
949 inline struct afb_event_x2 *afb_evt_event_x2_from_evtid(struct afb_evtid *evtid)
950 {
951         return &evtid->eventid;
952 }
953
954 /*
955  * Creates an event of 'fullname' and returns it.
956  * Returns an event with closure==NULL in case of error.
957  */
958 struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname)
959 {
960         return afb_evt_event_x2_from_evtid(afb_evt_evtid_create(fullname));
961 }
962
963 /*
964  * Creates an event of name 'prefix'/'name' and returns it.
965  * Returns an event with closure==NULL in case of error.
966  */
967 struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name)
968 {
969         return afb_evt_event_x2_from_evtid(afb_evt_evtid_create2(prefix, name));
970 }
971
972 /*
973  * Returns the fullname of the 'eventid'
974  */
975 const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid)
976 {
977         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
978         return evtid ? evtid->fullname : NULL;
979 }
980
981 /*
982  * Returns the id of the 'eventid'
983  */
984 uint16_t afb_evt_event_x2_id(struct afb_event_x2 *eventid)
985 {
986         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
987         return evtid ? evtid->id : 0;
988 }
989
990 /*
991  * Makes the 'listener' watching 'eventid'
992  * Returns 0 in case of success or else -1.
993  */
994 int afb_evt_event_x2_add_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
995 {
996         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
997
998         /* check parameter */
999         if (!evtid) {
1000                 errno = EINVAL;
1001                 return -1;
1002         }
1003
1004         /* search the existing watch for the listener */
1005         return afb_evt_watch_add_evtid(listener, evtid);
1006 }
1007
1008 /*
1009  * Avoids the 'listener' to watch 'eventid'
1010  * Returns 0 in case of success or else -1.
1011  */
1012 int afb_evt_event_x2_remove_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
1013 {
1014         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1015
1016         /* check parameter */
1017         if (!evtid) {
1018                 errno = EINVAL;
1019                 return -1;
1020         }
1021
1022         /* search the existing watch */
1023         return afb_evt_watch_sub_evtid(listener, evtid);
1024 }
1025
1026 int afb_evt_event_x2_push(struct afb_event_x2 *eventid, struct json_object *object)
1027 #if WITH_AFB_HOOK
1028 {
1029         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1030         if (evtid)
1031                 return afb_evt_evtid_hooked_push(evtid, object);
1032         json_object_put(object);
1033         return 0;
1034 }
1035 #else
1036         __attribute__((alias("afb_evt_event_x2_unhooked_push")));
1037 #endif
1038
1039 int afb_evt_event_x2_unhooked_push(struct afb_event_x2 *eventid, struct json_object *object)
1040 {
1041         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1042         if (evtid)
1043                 return afb_evt_evtid_push(evtid, object);
1044         json_object_put(object);
1045         return 0;
1046 }
1047
1048 #if WITH_LEGACY_BINDING_V1 || WITH_LEGACY_BINDING_V2
1049 struct afb_event_x1 afb_evt_event_from_evtid(struct afb_evtid *evtid)
1050 {
1051         return evtid
1052 #if WITH_AFB_HOOK
1053                 ? (struct afb_event_x1){ .itf = &afb_evt_hooked_event_x2_itf, .closure = &evtid->eventid }
1054 #else
1055                 ? (struct afb_event_x1){ .itf = &afb_evt_event_x2_itf, .closure = &evtid->eventid }
1056 #endif
1057                 : (struct afb_event_x1){ .itf = NULL, .closure = NULL };
1058 }
1059 #endif
1060
1061 void afb_evt_event_x2_unref(struct afb_event_x2 *eventid)
1062 {
1063         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1064         if (evtid)
1065                 afb_evt_evtid_unref(evtid);
1066 }
1067
1068 struct afb_event_x2 *afb_evt_event_x2_addref(struct afb_event_x2 *eventid)
1069 {
1070         struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
1071         if (evtid)
1072                 afb_evt_evtid_addref(evtid);
1073         return eventid;
1074 }
1075