2 * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
3 * Author "Fulup Ar Foll"
4 * Author José Bollo <jose.bollo@iot.bzh>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <json-c/json.h>
28 #include <afb/afb-event-itf.h>
37 * Structure for event listeners
39 struct afb_evt_listener {
41 /* chaining listeners */
42 struct afb_evt_listener *next;
44 /* interface for callbacks */
45 const struct afb_evt_itf *itf;
47 /* closure for the callback */
50 /* head of the list of events listened */
51 struct afb_evt_watch *watchs;
53 /* mutex of the listener */
54 pthread_mutex_t mutex;
56 /* count of reference to the listener */
61 * Structure for describing events
66 struct afb_event_itf *itf;
69 struct afb_evtid *next;
71 /* head of the list of listeners watching the event */
72 struct afb_evt_watch *watchs;
83 /* mutex of the event */
84 pthread_mutex_t mutex;
86 /* fullname of the event */
91 * Structure for associating events and listeners
93 struct afb_evt_watch {
96 struct afb_evtid *evtid;
98 /* link to the next watcher for the same evtid */
99 struct afb_evt_watch *next_by_evtid;
102 struct afb_evt_listener *listener;
104 /* link to the next watcher for the same listener */
105 struct afb_evt_watch *next_by_listener;
111 /* declare functions */
112 static void evt_hooked_drop(struct afb_evtid *evtid);
114 /* the interface for events */
115 static struct afb_event_itf afb_evt_event_itf = {
116 .broadcast = (void*)afb_evt_evtid_broadcast,
117 .push = (void*)afb_evt_evtid_push,
118 .drop = (void*)afb_evt_evtid_unref,
119 .name = (void*)afb_evt_evtid_name
122 /* the interface for events */
123 static struct afb_event_itf afb_evt_hooked_event_itf = {
124 .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
125 .push = (void*)afb_evt_evtid_hooked_push,
126 .drop = (void*)evt_hooked_drop,
127 .name = (void*)afb_evt_evtid_hooked_name
130 /* head of the list of listeners */
131 static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
132 static struct afb_evt_listener *listeners = NULL;
134 /* handling id of events */
135 static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
136 static struct afb_evtid *evtids = NULL;
137 static int event_id_counter = 0;
138 static int event_id_wrapped = 0;
141 * Broadcasts the 'event' of 'id' with its 'obj'
142 * 'obj' is released (like json_object_put)
143 * Returns the count of listener having receive the event.
145 static int broadcast(const char *event, struct json_object *obj, int id)
148 struct afb_evt_listener *listener;
151 pthread_mutex_lock(&listeners_mutex);
152 listener = listeners;
154 if (listener->itf->broadcast != NULL) {
155 listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
158 listener = listener->next;
160 pthread_mutex_unlock(&listeners_mutex);
161 json_object_put(obj);
166 * Broadcasts the 'event' of 'id' with its 'obj'
167 * 'obj' is released (like json_object_put)
168 * calls hooks if hookflags isn't 0
169 * Returns the count of listener having receive the event.
171 static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
175 json_object_get(obj);
177 if (hookflags & afb_hook_flag_evt_broadcast_before)
178 afb_hook_evt_broadcast_before(event, id, obj);
180 result = broadcast(event, obj, id);
182 if (hookflags & afb_hook_flag_evt_broadcast_after)
183 afb_hook_evt_broadcast_after(event, id, obj, result);
185 json_object_put(obj);
191 * Broadcasts the event 'evtid' with its 'object'
192 * 'object' is released (like json_object_put)
193 * Returns the count of listener that received the event.
195 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
197 return broadcast(evtid->fullname, object, evtid->id);
201 * Broadcasts the event 'evtid' with its 'object'
202 * 'object' is released (like json_object_put)
203 * Returns the count of listener that received the event.
205 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
207 return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
211 * Broadcasts the 'event' with its 'object'
212 * 'object' is released (like json_object_put)
213 * Returns the count of listener having receive the event.
215 int afb_evt_broadcast(const char *event, struct json_object *object)
217 return hooked_broadcast(event, object, 0, -1);
221 * Pushes the event 'evtid' with 'obj' to its listeners
222 * 'obj' is released (like json_object_put)
223 * Returns the count of listener that received the event.
225 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
228 struct afb_evt_watch *watch;
229 struct afb_evt_listener *listener;
232 pthread_mutex_lock(&evtid->mutex);
233 watch = evtid->watchs;
235 listener = watch->listener;
236 assert(listener->itf->push != NULL);
237 if (watch->activity != 0) {
238 listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
241 watch = watch->next_by_evtid;
243 pthread_mutex_unlock(&evtid->mutex);
244 json_object_put(obj);
249 * Pushes the event 'evtid' with 'obj' to its listeners
250 * 'obj' is released (like json_object_put)
251 * Emits calls to hooks.
252 * Returns the count of listener taht received the event.
254 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
258 /* lease the object */
259 json_object_get(obj);
261 /* hook before push */
262 if (evtid->hookflags & afb_hook_flag_evt_push_before)
263 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
266 result = afb_evt_evtid_push(evtid, obj);
268 /* hook after push */
269 if (evtid->hookflags & afb_hook_flag_evt_push_after)
270 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
272 /* release the object */
273 json_object_put(obj);
280 static void remove_watch(struct afb_evt_watch *watch)
282 struct afb_evt_watch **prv;
283 struct afb_evtid *evtid;
284 struct afb_evt_listener *listener;
286 /* notify listener if needed */
287 evtid = watch->evtid;
288 listener = watch->listener;
289 if (watch->activity != 0 && listener->itf->remove != NULL)
290 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
292 /* unlink the watch for its event */
293 prv = &evtid->watchs;
295 prv = &(*prv)->next_by_evtid;
296 *prv = watch->next_by_evtid;
298 /* unlink the watch for its listener */
299 prv = &listener->watchs;
301 prv = &(*prv)->next_by_listener;
302 *prv = watch->next_by_listener;
309 * Creates an event of name 'fullname' and returns it or NULL on error.
311 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
314 struct afb_evtid *evtid, *oevt;
316 /* allocates the event */
317 len = strlen(fullname);
318 evtid = malloc(len + sizeof * evtid);
322 /* allocates the id */
323 pthread_mutex_lock(&events_mutex);
325 if (++event_id_counter < 0) {
326 event_id_wrapped = 1;
327 event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
329 if (!event_id_wrapped)
332 while(oevt != NULL && oevt->id != event_id_counter)
334 } while (oevt != NULL);
336 /* initialize the event */
337 memcpy(evtid->fullname, fullname, len + 1);
338 evtid->next = evtids;
339 evtid->watchs = NULL;
340 evtid->id = event_id_counter;
341 pthread_mutex_init(&evtid->mutex, NULL);
343 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
344 evtid->itf = evtid->hookflags ? &afb_evt_hooked_event_itf : &afb_evt_event_itf;
345 if (evtid->hookflags & afb_hook_flag_evt_create)
346 afb_hook_evt_create(evtid->fullname, evtid->id);
347 pthread_mutex_unlock(&events_mutex);
349 /* returns the event */
356 * increment the reference count of the event 'evtid'
358 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
360 __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
365 * increment the reference count of the event 'evtid'
367 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
369 if (evtid->hookflags & afb_hook_flag_evt_addref)
370 afb_hook_evt_addref(evtid->fullname, evtid->id);
371 return afb_evt_evtid_addref(evtid);
375 * decrement the reference count of the event 'evtid'
376 * and destroy it when the count reachs zero
378 void afb_evt_evtid_unref(struct afb_evtid *evtid)
381 struct afb_evtid **prv;
382 struct afb_evt_listener *listener;
384 if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
385 /* unlinks the event if valid! */
386 pthread_mutex_lock(&events_mutex);
389 while (*prv && !(found = (*prv == evtid)))
393 pthread_mutex_unlock(&events_mutex);
395 /* destroys the event */
397 ERROR("event not found");
399 /* removes all watchers */
400 while(evtid->watchs != NULL) {
401 listener = evtid->watchs->listener;
402 pthread_mutex_lock(&listener->mutex);
403 pthread_mutex_lock(&evtid->mutex);
404 remove_watch(evtid->watchs);
405 pthread_mutex_unlock(&evtid->mutex);
406 pthread_mutex_unlock(&listener->mutex);
410 if (evtid->hookflags & afb_hook_flag_evt_drop)
411 afb_hook_evt_drop(evtid->fullname, evtid->id);
414 pthread_mutex_destroy(&evtid->mutex);
421 * decrement the reference count of the event 'evtid'
422 * and destroy it when the count reachs zero
424 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
426 if (evtid->hookflags & afb_hook_flag_evt_unref)
427 afb_hook_evt_unref(evtid->fullname, evtid->id);
428 afb_evt_evtid_unref(evtid);
431 static void evt_hooked_drop(struct afb_evtid *evtid)
433 if (evtid->hookflags & afb_hook_flag_evt_drop)
434 afb_hook_evt_drop(evtid->fullname, evtid->id);
435 afb_evt_evtid_unref(evtid);
439 * Returns the true name of the 'event'
441 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
443 return evtid->fullname;
447 * Returns the name of the 'event'
449 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
451 const char *name = strchr(evtid->fullname, '/');
452 return name ? name + 1 : evtid->fullname;
456 * Returns the name associated to the event 'evtid'.
458 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
460 const char *result = afb_evt_evtid_name(evtid);
461 if (evtid->hookflags & afb_hook_flag_evt_name)
462 afb_hook_evt_name(evtid->fullname, evtid->id, result);
467 * Returns the id of the 'event'
469 int afb_evt_evtid_id(struct afb_evtid *evtid)
475 * Returns an instance of the listener defined by the 'send' callback
477 * Returns NULL in case of memory depletion.
479 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
481 struct afb_evt_listener *listener;
483 /* search if an instance already exists */
484 pthread_mutex_lock(&listeners_mutex);
485 listener = listeners;
486 while (listener != NULL) {
487 if (listener->itf == itf && listener->closure == closure) {
488 listener = afb_evt_listener_addref(listener);
491 listener = listener->next;
495 listener = calloc(1, sizeof *listener);
496 if (listener != NULL) {
499 listener->closure = closure;
500 listener->watchs = NULL;
501 listener->refcount = 1;
502 pthread_mutex_init(&listener->mutex, NULL);
503 listener->next = listeners;
504 listeners = listener;
507 pthread_mutex_unlock(&listeners_mutex);
512 * Increases the reference count of 'listener' and returns it
514 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
516 __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
521 * Decreases the reference count of the 'listener' and destroys it
524 void afb_evt_listener_unref(struct afb_evt_listener *listener)
526 struct afb_evt_listener **prv;
527 struct afb_evtid *evtid;
529 if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
531 /* unlink the listener */
532 pthread_mutex_lock(&listeners_mutex);
534 while (*prv != listener)
536 *prv = listener->next;
537 pthread_mutex_unlock(&listeners_mutex);
539 /* remove the watchers */
540 pthread_mutex_lock(&listener->mutex);
541 while (listener->watchs != NULL) {
542 evtid = listener->watchs->evtid;
543 pthread_mutex_lock(&evtid->mutex);
544 remove_watch(listener->watchs);
545 pthread_mutex_unlock(&evtid->mutex);
547 pthread_mutex_unlock(&listener->mutex);
549 /* free the listener */
550 pthread_mutex_destroy(&listener->mutex);
556 * Makes the 'listener' watching 'evtid'
557 * Returns 0 in case of success or else -1.
559 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
561 struct afb_evt_watch *watch;
563 /* check parameter */
564 if (listener->itf->push == NULL) {
569 /* search the existing watch for the listener */
570 pthread_mutex_lock(&listener->mutex);
571 watch = listener->watchs;
572 while(watch != NULL) {
573 if (watch->evtid == evtid)
575 watch = watch->next_by_listener;
578 /* not found, allocate a new */
579 watch = malloc(sizeof *watch);
581 pthread_mutex_unlock(&listener->mutex);
586 /* initialise and link */
587 watch->evtid = evtid;
589 watch->listener = listener;
590 watch->next_by_listener = listener->watchs;
591 listener->watchs = watch;
592 pthread_mutex_lock(&evtid->mutex);
593 watch->next_by_evtid = evtid->watchs;
594 evtid->watchs = watch;
595 pthread_mutex_unlock(&evtid->mutex);
598 if (watch->activity == 0 && listener->itf->add != NULL)
599 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
601 pthread_mutex_unlock(&listener->mutex);
607 * Avoids the 'listener' to watch 'evtid'
608 * Returns 0 in case of success or else -1.
610 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
612 struct afb_evt_watch *watch;
614 /* search the existing watch */
615 pthread_mutex_lock(&listener->mutex);
616 watch = listener->watchs;
617 while(watch != NULL) {
618 if (watch->evtid == evtid) {
619 if (watch->activity != 0) {
621 if (watch->activity == 0 && listener->itf->remove != NULL)
622 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
624 pthread_mutex_unlock(&listener->mutex);
627 watch = watch->next_by_listener;
629 pthread_mutex_unlock(&listener->mutex);
635 * update the hooks for events
637 void afb_evt_update_hooks()
639 struct afb_evtid *evtid;
641 pthread_mutex_lock(&events_mutex);
642 for (evtid = evtids ; evtid ; evtid = evtid->next) {
643 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
644 evtid->itf = evtid->hookflags ? &afb_evt_hooked_event_itf : &afb_evt_event_itf;
646 pthread_mutex_unlock(&events_mutex);
649 struct afb_evtid *afb_evt_to_evtid(struct afb_event event)
651 return (struct afb_evtid*)(event.itf == &afb_evt_hooked_event_itf ? event.closure : NULL);
654 struct afb_event afb_evt_from_evtid(struct afb_evtid *evtid)
656 return (struct afb_event){ .itf = evtid ? &afb_evt_hooked_event_itf : NULL, .closure = evtid };
660 * Creates an event of 'fullname' and returns it.
661 * Returns an event with closure==NULL in case of error.
663 struct afb_event afb_evt_create_event(const char *fullname)
665 return afb_evt_from_evtid(afb_evt_evtid_create(fullname));
669 * Returns the fullname of the 'event'
671 const char *afb_evt_event_fullname(struct afb_event event)
673 struct afb_evtid *evtid = afb_evt_to_evtid(event);
674 return evtid ? evtid->fullname : NULL;
678 * Returns the id of the 'event'
680 int afb_evt_event_id(struct afb_event event)
682 struct afb_evtid *evtid = afb_evt_to_evtid(event);
683 return evtid ? evtid->id : 0;
687 * Makes the 'listener' watching 'event'
688 * Returns 0 in case of success or else -1.
690 int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event)
692 struct afb_evtid *evtid = afb_evt_to_evtid(event);
694 /* check parameter */
700 /* search the existing watch for the listener */
701 return afb_evt_watch_add_evtid(listener, evtid);
705 * Avoids the 'listener' to watch 'event'
706 * Returns 0 in case of success or else -1.
708 int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
710 struct afb_evtid *evtid = afb_evt_to_evtid(event);
712 /* check parameter */
718 /* search the existing watch */
719 return afb_evt_watch_sub_evtid(listener, evtid);
722 int afb_evt_push(struct afb_event event, struct json_object *object)
724 struct afb_evtid *evtid = afb_evt_to_evtid(event);
726 return afb_evt_evtid_hooked_push(evtid, object);
727 json_object_put(object);
731 int afb_evt_unhooked_push(struct afb_event event, struct json_object *object)
733 struct afb_evtid *evtid = afb_evt_to_evtid(event);
735 return afb_evt_evtid_push(evtid, object);
736 json_object_put(object);