2 * Copyright (C) 2015-2018 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <json-c/json.h>
27 #include <afb/afb-eventid-itf.h>
28 #include <afb/afb-event.h>
37 * Structure for event listeners
39 struct afb_evt_listener {
41 /* chaining listeners */
42 struct afb_evt_listener *next;
44 /* interface for callbacks */
45 const struct afb_evt_itf *itf;
47 /* closure for the callback */
50 /* head of the list of events listened */
51 struct afb_evt_watch *watchs;
53 /* mutex of the listener */
54 pthread_mutex_t mutex;
56 /* count of reference to the listener */
61 * Structure for describing events
66 struct afb_eventid eventid;
69 struct afb_evtid *next;
71 /* head of the list of listeners watching the event */
72 struct afb_evt_watch *watchs;
74 /* mutex of the event */
75 pthread_mutex_t mutex;
86 /* fullname of the event */
91 * Structure for associating events and listeners
93 struct afb_evt_watch {
96 struct afb_evtid *evtid;
98 /* link to the next watcher for the same evtid */
99 struct afb_evt_watch *next_by_evtid;
102 struct afb_evt_listener *listener;
104 /* link to the next watcher for the same listener */
105 struct afb_evt_watch *next_by_listener;
111 /* the interface for events */
112 static struct afb_eventid_itf afb_evt_eventid_itf = {
113 .broadcast = (void*)afb_evt_evtid_broadcast,
114 .push = (void*)afb_evt_evtid_push,
115 .unref = (void*)afb_evt_evtid_unref,
116 .name = (void*)afb_evt_evtid_name,
117 .addref = (void*)afb_evt_evtid_addref
120 /* the interface for events */
121 static struct afb_eventid_itf afb_evt_hooked_eventid_itf = {
122 .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
123 .push = (void*)afb_evt_evtid_hooked_push,
124 .unref = (void*)afb_evt_evtid_hooked_unref,
125 .name = (void*)afb_evt_evtid_hooked_name,
126 .addref = (void*)afb_evt_evtid_hooked_addref
129 /* head of the list of listeners */
130 static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
131 static struct afb_evt_listener *listeners = NULL;
133 /* handling id of events */
134 static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
135 static struct afb_evtid *evtids = NULL;
136 static int event_id_counter = 0;
137 static int event_id_wrapped = 0;
140 * Broadcasts the 'event' of 'id' with its 'obj'
141 * 'obj' is released (like json_object_put)
142 * Returns the count of listener having receive the event.
144 static int broadcast(const char *event, struct json_object *obj, int id)
147 struct afb_evt_listener *listener;
150 pthread_mutex_lock(&listeners_mutex);
151 listener = listeners;
153 if (listener->itf->broadcast != NULL) {
154 listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
157 listener = listener->next;
159 pthread_mutex_unlock(&listeners_mutex);
160 json_object_put(obj);
165 * Broadcasts the 'event' of 'id' with its 'obj'
166 * 'obj' is released (like json_object_put)
167 * calls hooks if hookflags isn't 0
168 * Returns the count of listener having receive the event.
170 static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags)
174 json_object_get(obj);
176 if (hookflags & afb_hook_flag_evt_broadcast_before)
177 afb_hook_evt_broadcast_before(event, id, obj);
179 result = broadcast(event, obj, id);
181 if (hookflags & afb_hook_flag_evt_broadcast_after)
182 afb_hook_evt_broadcast_after(event, id, obj, result);
184 json_object_put(obj);
190 * Broadcasts the event 'evtid' with its 'object'
191 * 'object' is released (like json_object_put)
192 * Returns the count of listener that received the event.
194 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
196 return broadcast(evtid->fullname, object, evtid->id);
200 * Broadcasts the event 'evtid' with its 'object'
201 * 'object' is released (like json_object_put)
202 * Returns the count of listener that received the event.
204 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
206 return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags);
210 * Broadcasts the 'event' with its 'object'
211 * 'object' is released (like json_object_put)
212 * Returns the count of listener having receive the event.
214 int afb_evt_broadcast(const char *event, struct json_object *object)
216 return hooked_broadcast(event, object, 0, -1);
220 * Pushes the event 'evtid' with 'obj' to its listeners
221 * 'obj' is released (like json_object_put)
222 * Returns the count of listener that received the event.
224 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj)
227 struct afb_evt_watch *watch;
228 struct afb_evt_listener *listener;
231 pthread_mutex_lock(&evtid->mutex);
232 watch = evtid->watchs;
234 listener = watch->listener;
235 assert(listener->itf->push != NULL);
236 if (watch->activity != 0) {
237 listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj));
240 watch = watch->next_by_evtid;
242 pthread_mutex_unlock(&evtid->mutex);
243 json_object_put(obj);
248 * Pushes the event 'evtid' with 'obj' to its listeners
249 * 'obj' is released (like json_object_put)
250 * Emits calls to hooks.
251 * Returns the count of listener taht received the event.
253 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
257 /* lease the object */
258 json_object_get(obj);
260 /* hook before push */
261 if (evtid->hookflags & afb_hook_flag_evt_push_before)
262 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
265 result = afb_evt_evtid_push(evtid, obj);
267 /* hook after push */
268 if (evtid->hookflags & afb_hook_flag_evt_push_after)
269 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
271 /* release the object */
272 json_object_put(obj);
279 static void remove_watch(struct afb_evt_watch *watch)
281 struct afb_evt_watch **prv;
282 struct afb_evtid *evtid;
283 struct afb_evt_listener *listener;
285 /* notify listener if needed */
286 evtid = watch->evtid;
287 listener = watch->listener;
288 if (watch->activity != 0 && listener->itf->remove != NULL)
289 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
291 /* unlink the watch for its event */
292 prv = &evtid->watchs;
294 prv = &(*prv)->next_by_evtid;
295 *prv = watch->next_by_evtid;
297 /* unlink the watch for its listener */
298 prv = &listener->watchs;
300 prv = &(*prv)->next_by_listener;
301 *prv = watch->next_by_listener;
308 * Creates an event of name 'fullname' and returns it or NULL on error.
310 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
313 struct afb_evtid *evtid, *oevt;
315 /* allocates the event */
316 len = strlen(fullname);
317 evtid = malloc(len + sizeof * evtid);
321 /* allocates the id */
322 pthread_mutex_lock(&events_mutex);
324 if (++event_id_counter < 0) {
325 event_id_wrapped = 1;
326 event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
328 if (!event_id_wrapped)
331 while(oevt != NULL && oevt->id != event_id_counter)
333 } while (oevt != NULL);
335 /* initialize the event */
336 memcpy(evtid->fullname, fullname, len + 1);
337 evtid->next = evtids;
338 evtid->watchs = NULL;
339 evtid->id = event_id_counter;
340 pthread_mutex_init(&evtid->mutex, NULL);
342 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
343 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_eventid_itf;
344 if (evtid->hookflags & afb_hook_flag_evt_create)
345 afb_hook_evt_create(evtid->fullname, evtid->id);
346 pthread_mutex_unlock(&events_mutex);
348 /* returns the event */
355 * Creates an event of name 'prefix'/'name' and returns it or NULL on error.
357 struct afb_evtid *afb_evt_evtid_create2(const char *prefix, const char *name)
359 size_t prelen, postlen;
362 /* makes the event fullname */
363 prelen = strlen(prefix);
364 postlen = strlen(name);
365 fullname = alloca(prelen + postlen + 2);
366 memcpy(fullname, prefix, prelen);
367 fullname[prelen] = '/';
368 memcpy(fullname + prelen + 1, name, postlen + 1);
370 /* create the event */
371 return afb_evt_evtid_create(fullname);
375 * increment the reference count of the event 'evtid'
377 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
379 __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
384 * increment the reference count of the event 'evtid'
386 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
388 if (evtid->hookflags & afb_hook_flag_evt_addref)
389 afb_hook_evt_addref(evtid->fullname, evtid->id);
390 return afb_evt_evtid_addref(evtid);
394 * decrement the reference count of the event 'evtid'
395 * and destroy it when the count reachs zero
397 void afb_evt_evtid_unref(struct afb_evtid *evtid)
400 struct afb_evtid **prv;
401 struct afb_evt_listener *listener;
403 if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
404 /* unlinks the event if valid! */
405 pthread_mutex_lock(&events_mutex);
408 while (*prv && !(found = (*prv == evtid)))
412 pthread_mutex_unlock(&events_mutex);
414 /* destroys the event */
416 ERROR("event not found");
418 /* removes all watchers */
419 while(evtid->watchs != NULL) {
420 listener = evtid->watchs->listener;
421 pthread_mutex_lock(&listener->mutex);
422 pthread_mutex_lock(&evtid->mutex);
423 remove_watch(evtid->watchs);
424 pthread_mutex_unlock(&evtid->mutex);
425 pthread_mutex_unlock(&listener->mutex);
429 pthread_mutex_destroy(&evtid->mutex);
436 * decrement the reference count of the event 'evtid'
437 * and destroy it when the count reachs zero
439 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
441 if (evtid->hookflags & afb_hook_flag_evt_unref)
442 afb_hook_evt_unref(evtid->fullname, evtid->id);
443 afb_evt_evtid_unref(evtid);
447 * Returns the true name of the 'event'
449 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
451 return evtid->fullname;
455 * Returns the name of the 'event'
457 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
459 const char *name = strchr(evtid->fullname, '/');
460 return name ? name + 1 : evtid->fullname;
464 * Returns the name associated to the event 'evtid'.
466 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
468 const char *result = afb_evt_evtid_name(evtid);
469 if (evtid->hookflags & afb_hook_flag_evt_name)
470 afb_hook_evt_name(evtid->fullname, evtid->id, result);
475 * Returns the id of the 'event'
477 int afb_evt_evtid_id(struct afb_evtid *evtid)
483 * Returns an instance of the listener defined by the 'send' callback
485 * Returns NULL in case of memory depletion.
487 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
489 struct afb_evt_listener *listener;
491 /* search if an instance already exists */
492 pthread_mutex_lock(&listeners_mutex);
493 listener = listeners;
494 while (listener != NULL) {
495 if (listener->itf == itf && listener->closure == closure) {
496 listener = afb_evt_listener_addref(listener);
499 listener = listener->next;
503 listener = calloc(1, sizeof *listener);
504 if (listener != NULL) {
507 listener->closure = closure;
508 listener->watchs = NULL;
509 listener->refcount = 1;
510 pthread_mutex_init(&listener->mutex, NULL);
511 listener->next = listeners;
512 listeners = listener;
515 pthread_mutex_unlock(&listeners_mutex);
520 * Increases the reference count of 'listener' and returns it
522 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
524 __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
529 * Decreases the reference count of the 'listener' and destroys it
532 void afb_evt_listener_unref(struct afb_evt_listener *listener)
534 struct afb_evt_listener **prv;
535 struct afb_evtid *evtid;
537 if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
539 /* unlink the listener */
540 pthread_mutex_lock(&listeners_mutex);
542 while (*prv != listener)
544 *prv = listener->next;
545 pthread_mutex_unlock(&listeners_mutex);
547 /* remove the watchers */
548 pthread_mutex_lock(&listener->mutex);
549 while (listener->watchs != NULL) {
550 evtid = listener->watchs->evtid;
551 pthread_mutex_lock(&evtid->mutex);
552 remove_watch(listener->watchs);
553 pthread_mutex_unlock(&evtid->mutex);
555 pthread_mutex_unlock(&listener->mutex);
557 /* free the listener */
558 pthread_mutex_destroy(&listener->mutex);
564 * Makes the 'listener' watching 'evtid'
565 * Returns 0 in case of success or else -1.
567 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
569 struct afb_evt_watch *watch;
571 /* check parameter */
572 if (listener->itf->push == NULL) {
577 /* search the existing watch for the listener */
578 pthread_mutex_lock(&listener->mutex);
579 watch = listener->watchs;
580 while(watch != NULL) {
581 if (watch->evtid == evtid)
583 watch = watch->next_by_listener;
586 /* not found, allocate a new */
587 watch = malloc(sizeof *watch);
589 pthread_mutex_unlock(&listener->mutex);
594 /* initialise and link */
595 watch->evtid = evtid;
597 watch->listener = listener;
598 watch->next_by_listener = listener->watchs;
599 listener->watchs = watch;
600 pthread_mutex_lock(&evtid->mutex);
601 watch->next_by_evtid = evtid->watchs;
602 evtid->watchs = watch;
603 pthread_mutex_unlock(&evtid->mutex);
606 if (watch->activity == 0 && listener->itf->add != NULL)
607 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
609 pthread_mutex_unlock(&listener->mutex);
615 * Avoids the 'listener' to watch 'evtid'
616 * Returns 0 in case of success or else -1.
618 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
620 struct afb_evt_watch *watch;
622 /* search the existing watch */
623 pthread_mutex_lock(&listener->mutex);
624 watch = listener->watchs;
625 while(watch != NULL) {
626 if (watch->evtid == evtid) {
627 if (watch->activity != 0) {
629 if (watch->activity == 0 && listener->itf->remove != NULL)
630 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
632 pthread_mutex_unlock(&listener->mutex);
635 watch = watch->next_by_listener;
637 pthread_mutex_unlock(&listener->mutex);
643 * update the hooks for events
645 void afb_evt_update_hooks()
647 struct afb_evtid *evtid;
649 pthread_mutex_lock(&events_mutex);
650 for (evtid = evtids ; evtid ; evtid = evtid->next) {
651 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
652 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_eventid_itf;
654 pthread_mutex_unlock(&events_mutex);
657 inline struct afb_evtid *afb_evt_eventid_to_evtid(struct afb_eventid *eventid)
659 return (struct afb_evtid*)eventid;
662 inline struct afb_eventid *afb_evt_eventid_from_evtid(struct afb_evtid *evtid)
664 return &evtid->eventid;
668 * Creates an event of 'fullname' and returns it.
669 * Returns an event with closure==NULL in case of error.
671 struct afb_eventid *afb_evt_eventid_create(const char *fullname)
673 return afb_evt_eventid_from_evtid(afb_evt_evtid_create(fullname));
677 * Creates an event of name 'prefix'/'name' and returns it.
678 * Returns an event with closure==NULL in case of error.
680 struct afb_eventid *afb_evt_eventid_create2(const char *prefix, const char *name)
682 return afb_evt_eventid_from_evtid(afb_evt_evtid_create2(prefix, name));
686 * Returns the fullname of the 'eventid'
688 const char *afb_evt_eventid_fullname(struct afb_eventid *eventid)
690 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
691 return evtid ? evtid->fullname : NULL;
695 * Returns the id of the 'eventid'
697 int afb_evt_eventid_id(struct afb_eventid *eventid)
699 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
700 return evtid ? evtid->id : 0;
704 * Makes the 'listener' watching 'eventid'
705 * Returns 0 in case of success or else -1.
707 int afb_evt_eventid_add_watch(struct afb_evt_listener *listener, struct afb_eventid *eventid)
709 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
711 /* check parameter */
717 /* search the existing watch for the listener */
718 return afb_evt_watch_add_evtid(listener, evtid);
722 * Avoids the 'listener' to watch 'eventid'
723 * Returns 0 in case of success or else -1.
725 int afb_evt_eventid_remove_watch(struct afb_evt_listener *listener, struct afb_eventid *eventid)
727 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
729 /* check parameter */
735 /* search the existing watch */
736 return afb_evt_watch_sub_evtid(listener, evtid);
739 int afb_evt_eventid_push(struct afb_eventid *eventid, struct json_object *object)
741 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
743 return afb_evt_evtid_hooked_push(evtid, object);
744 json_object_put(object);
748 int afb_evt_eventid_unhooked_push(struct afb_eventid *eventid, struct json_object *object)
750 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
752 return afb_evt_evtid_push(evtid, object);
753 json_object_put(object);
757 struct afb_event afb_evt_event_from_evtid(struct afb_evtid *evtid)
760 ? (struct afb_event){ .itf = &afb_evt_hooked_eventid_itf, .closure = &evtid->eventid }
761 : (struct afb_event){ .itf = NULL, .closure = NULL };
764 void afb_evt_eventid_unref(struct afb_eventid *eventid)
766 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
768 afb_evt_evtid_unref(evtid);
771 struct afb_eventid *afb_evt_eventid_addref(struct afb_eventid *eventid)
773 struct afb_evtid *evtid = afb_evt_eventid_to_evtid(eventid);
775 afb_evt_evtid_addref(evtid);