X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-evt.c;h=f54f34e17ae2520c757735b1de06a85bc97622c6;hb=6f13ad1989875b5a0ce50b24211fd1fba093735f;hp=a75cbbcc2a9ce00aa139300dbec1415adb8fec23;hpb=60cd11786766ebc148b7ec088962dd6e112f8762;p=src%2Fapp-framework-binder.git diff --git a/src/afb-evt.c b/src/afb-evt.c index a75cbbcc..f54f34e1 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -30,6 +30,8 @@ #include "afb-evt.h" #include "afb-hook.h" #include "verbose.h" +#include "jobs.h" +#include "uuid.h" struct afb_evt_watch; @@ -85,6 +87,9 @@ struct afb_evtid { /* id of the event */ int id; + /* has client? */ + int has_client; + /* fullname of the event */ char fullname[]; }; @@ -110,6 +115,36 @@ struct afb_evt_watch { unsigned activity; }; +/* + * structure for job of broadcasting events + */ +struct job_broadcast +{ + /** object atached to the event */ + struct json_object *object; + + /** the uuid of the event */ + uuid_binary_t uuid; + + /** remaining hop */ + uint8_t hop; + + /** name of the event to broadcast */ + char event[]; +}; + +/* + * structure for job of broadcasting or pushing events + */ +struct job_evtid +{ + /** the event to broadcast */ + struct afb_evtid *evtid; + + /** object atached to the event */ + struct json_object *object; +}; + /* the interface for events */ static struct afb_event_x2_itf afb_evt_event_x2_itf = { .broadcast = (void*)afb_evt_evtid_broadcast, @@ -130,6 +165,10 @@ static struct afb_event_x2_itf afb_evt_hooked_event_x2_itf = { }; #endif +/* job groups for events push/broadcast */ +#define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf) +#define PUSH_JOB_GROUP (&afb_evt_event_x2_itf) + /* head of the list of listeners */ static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER; static struct afb_evt_listener *listeners = NULL; @@ -140,30 +179,167 @@ static struct afb_evtid *evtids = NULL; static int event_id_counter = 0; static int event_id_wrapped = 0; +/* head of uniqueness of events */ +#if !defined(EVENT_BROADCAST_HOP_MAX) +# define EVENT_BROADCAST_HOP_MAX 10 +#endif +#if !defined(EVENT_BROADCAST_MEMORY_COUNT) +# define EVENT_BROADCAST_MEMORY_COUNT 8 +#endif + +#if EVENT_BROADCAST_MEMORY_COUNT +static struct { + pthread_mutex_t mutex; + uint8_t base; + uint8_t count; + uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT]; +} uniqueness = { + .mutex = PTHREAD_MUTEX_INITIALIZER, + .base = 0, + .count = 0 +}; +#endif + /* - * Broadcasts the 'event' of 'id' with its 'obj' - * 'obj' is released (like json_object_put) - * Returns the count of listener having receive the event. + * Create structure for job of broadcasting string 'event' with 'object' + * Returns the created structure or NULL if out of memory */ -static int broadcast(const char *event, struct json_object *obj, int id) +static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { - int result; - struct afb_evt_listener *listener; + size_t sz = 1 + strlen(event); + struct job_broadcast *jb = malloc(sz + sizeof *jb); + if (jb) { + jb->object = object; + memcpy(jb->uuid, uuid, sizeof jb->uuid); + jb->hop = hop; + memcpy(jb->event, event, sz); + } + return jb; +} - result = 0; +/* + * Destroy structure 'jb' for job of broadcasting string events + */ +static void destroy_job_broadcast(struct job_broadcast *jb) +{ + json_object_put(jb->object); + free(jb); +} + +/* + * Create structure for job of broadcasting or pushing 'evtid' with 'object' + * Returns the created structure or NULL if out of memory + */ +static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object) +{ + struct job_evtid *je = malloc(sizeof *je); + if (je) { + je->evtid = afb_evt_evtid_addref(evtid); + je->object = object; + } + return je; +} + +/* + * Destroy structure for job of broadcasting or pushing evtid + */ +static void destroy_job_evtid(struct job_evtid *je) +{ + afb_evt_evtid_unref(je->evtid); + json_object_put(je->object); + free(je); +} + +/* + * Broadcasts the 'event' of 'id' with its 'object' + */ +static void broadcast(struct job_broadcast *jb) +{ + struct afb_evt_listener *listener; pthread_rwlock_rdlock(&listeners_rwlock); listener = listeners; while(listener) { - if (listener->itf->broadcast != NULL) { - listener->itf->broadcast(listener->closure, event, id, json_object_get(obj)); - result++; - } + if (listener->itf->broadcast != NULL) + listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop); listener = listener->next; } pthread_rwlock_unlock(&listeners_rwlock); - json_object_put(obj); - return result; +} + +/* + * Jobs callback for broadcasting string asynchronously + */ +static void broadcast_job(int signum, void *closure) +{ + struct job_broadcast *jb = closure; + + if (signum == 0) + broadcast(jb); + destroy_job_broadcast(jb); +} + +/* + * Broadcasts the string 'event' with its 'object' + */ +static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) +{ + uuid_binary_t local_uuid; + struct job_broadcast *jb; + int rc; +#if EVENT_BROADCAST_MEMORY_COUNT + int iter, count; +#endif + + /* check if lately sent */ + if (!uuid) { + uuid_new_binary(local_uuid); + uuid = local_uuid; + hop = EVENT_BROADCAST_HOP_MAX; +#if EVENT_BROADCAST_MEMORY_COUNT + pthread_mutex_lock(&uniqueness.mutex); + } else { + pthread_mutex_lock(&uniqueness.mutex); + iter = (int)uniqueness.base; + count = (int)uniqueness.count; + while (count) { + if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) { + pthread_mutex_unlock(&uniqueness.mutex); + return 0; + } + if (++iter == EVENT_BROADCAST_MEMORY_COUNT) + iter = 0; + count--; + } + } + iter = (int)uniqueness.base; + if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT) + iter += (int)(uniqueness.count++); + else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT) + uniqueness.base = 0; + memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t)); + pthread_mutex_unlock(&uniqueness.mutex); +#else + } +#endif + + /* create the structure for the job */ + jb = make_job_broadcast(event, object, uuid, hop); + if (jb == NULL) { + ERROR("Cant't create broadcast string job item for %s(%s)", + event, json_object_to_json_string(object)); + json_object_put(object); + return -1; + } + + /* queue the job */ + rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb); + if (rc) { + ERROR("cant't queue broadcast string job item for %s(%s)", + event, json_object_to_json_string(object)); + destroy_job_broadcast(jb); + } + return rc; } /* @@ -173,45 +349,52 @@ static int broadcast(const char *event, struct json_object *obj, int id) */ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object) { - return broadcast(evtid->fullname, object, evtid->id); + return unhooked_broadcast(evtid->fullname, object, NULL, 0); } #if WITH_AFB_HOOK /* - * Broadcasts the 'event' of 'id' with its 'obj' - * 'obj' is released (like json_object_put) - * calls hooks if hookflags isn't 0 - * Returns the count of listener having receive the event. + * Broadcasts the event 'evtid' with its 'object' + * 'object' is released (like json_object_put) + * Returns the count of listener that received the event. */ -static int hooked_broadcast(const char *event, struct json_object *obj, int id, int hookflags) +int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object) { int result; - json_object_get(obj); + json_object_get(object); - if (hookflags & afb_hook_flag_evt_broadcast_before) - afb_hook_evt_broadcast_before(event, id, obj); + if (evtid->hookflags & afb_hook_flag_evt_broadcast_before) + afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object); - result = broadcast(event, obj, id); + result = afb_evt_evtid_broadcast(evtid, object); - if (hookflags & afb_hook_flag_evt_broadcast_after) - afb_hook_evt_broadcast_after(event, id, obj, result); + if (evtid->hookflags & afb_hook_flag_evt_broadcast_after) + afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result); - json_object_put(obj); + json_object_put(object); return result; } +#endif -/* - * Broadcasts the event 'evtid' with its 'object' - * 'object' is released (like json_object_put) - * Returns the count of listener that received the event. - */ -int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object) +int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { - return hooked_broadcast(evtid->fullname, object, evtid->id, evtid->hookflags); -} + int result; + +#if WITH_AFB_HOOK + json_object_get(object); + afb_hook_evt_broadcast_before(event, 0, object); +#endif + + result = unhooked_broadcast(event, object, uuid, hop); + +#if WITH_AFB_HOOK + afb_hook_evt_broadcast_after(event, 0, object, result); + json_object_put(object); #endif + return result; +} /* * Broadcasts the 'event' with its 'object' @@ -220,40 +403,109 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object * */ int afb_evt_broadcast(const char *event, struct json_object *object) { -#if WITH_AFB_HOOK - return hooked_broadcast(event, object, 0, -1); -#else - return broadcast(event, object, 0); -#endif + return afb_evt_rebroadcast(event, object, NULL, 0); } /* * Pushes the event 'evtid' with 'obj' to its listeners - * 'obj' is released (like json_object_put) * Returns the count of listener that received the event. */ -int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *obj) +static void push_evtid(struct afb_evtid *evtid, struct json_object *object) { - int result; + int has_client; struct afb_evt_watch *watch; struct afb_evt_listener *listener; - result = 0; + has_client = 0; pthread_rwlock_rdlock(&evtid->rwlock); watch = evtid->watchs; while(watch) { listener = watch->listener; assert(listener->itf->push != NULL); if (watch->activity != 0) { - listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(obj)); - result++; + listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object)); + has_client = 1; } watch = watch->next_by_evtid; } + evtid->has_client = has_client; pthread_rwlock_unlock(&evtid->rwlock); +} + +/* + * Jobs callback for pushing evtid asynchronously + */ +static void push_job_evtid(int signum, void *closure) +{ + struct job_evtid *je = closure; + + if (signum == 0) + push_evtid(je->evtid, je->object); + destroy_job_evtid(je); +} + +/* + * Pushes the event 'evtid' with 'obj' to its listeners + * 'obj' is released (like json_object_put) + * Returns 1 if at least one listener exists or 0 if no listener exists or + * -1 in case of error and the event can't be delivered + */ +int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object) +{ + struct job_evtid *je; + int rc; + + je = make_job_evtid(evtid, object); + if (je == NULL) { + ERROR("Cant't create push evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + json_object_put(object); + return -1; + } + + rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je); + if (rc == 0) + rc = evtid->has_client; + else { + ERROR("cant't queue push evtid job item for %s(%s)", + evtid->fullname, json_object_to_json_string(object)); + destroy_job_evtid(je); + } + + return rc; +} + +#if WITH_AFB_HOOK +/* + * Pushes the event 'evtid' with 'obj' to its listeners + * 'obj' is released (like json_object_put) + * Emits calls to hooks. + * Returns the count of listener taht received the event. + */ +int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj) +{ + + int result; + + /* lease the object */ + json_object_get(obj); + + /* hook before push */ + if (evtid->hookflags & afb_hook_flag_evt_push_before) + afb_hook_evt_push_before(evtid->fullname, evtid->id, obj); + + /* push */ + result = afb_evt_evtid_push(evtid, obj); + + /* hook after push */ + if (evtid->hookflags & afb_hook_flag_evt_push_after) + afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result); + + /* release the object */ json_object_put(obj); return result; } +#endif /* * remove the 'watch' @@ -320,6 +572,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) evtid->refcount = 1; evtid->watchs = NULL; evtid->id = event_id_counter; + evtid->has_client = 0; pthread_rwlock_init(&evtid->rwlock, NULL); evtids = evtid; #if WITH_AFB_HOOK @@ -367,6 +620,18 @@ struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid) return evtid; } +#if WITH_AFB_HOOK +/* + * increment the reference count of the event 'evtid' + */ +struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid) +{ + if (evtid->hookflags & afb_hook_flag_evt_addref) + afb_hook_evt_addref(evtid->fullname, evtid->id); + return afb_evt_evtid_addref(evtid); +} +#endif + /* * decrement the reference count of the event 'evtid' * and destroy it when the count reachs zero @@ -409,6 +674,19 @@ void afb_evt_evtid_unref(struct afb_evtid *evtid) } } +#if WITH_AFB_HOOK +/* + * decrement the reference count of the event 'evtid' + * and destroy it when the count reachs zero + */ +void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid) +{ + if (evtid->hookflags & afb_hook_flag_evt_unref) + afb_hook_evt_unref(evtid->fullname, evtid->id); + afb_evt_evtid_unref(evtid); +} +#endif + /* * Returns the true name of the 'event' */ @@ -426,6 +704,19 @@ const char *afb_evt_evtid_name(struct afb_evtid *evtid) return name ? name + 1 : evtid->fullname; } +#if WITH_AFB_HOOK +/* + * Returns the name associated to the event 'evtid'. + */ +const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid) +{ + const char *result = afb_evt_evtid_name(evtid); + if (evtid->hookflags & afb_hook_flag_evt_name) + afb_hook_evt_name(evtid->fullname, evtid->id, result); + return result; +} +#endif + /* * Returns the id of the 'event' */ @@ -561,6 +852,7 @@ found: if (watch->activity == 0 && listener->itf->add != NULL) listener->itf->add(listener->closure, evtid->fullname, evtid->id); watch->activity++; + evtid->has_client = 1; pthread_rwlock_unlock(&listener->rwlock); return 0; @@ -594,6 +886,23 @@ int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid return -1; } +#if WITH_AFB_HOOK +/* + * update the hooks for events + */ +void afb_evt_update_hooks() +{ + struct afb_evtid *evtid; + + pthread_rwlock_rdlock(&events_rwlock); + for (evtid = evtids ; evtid ; evtid = evtid->next) { + evtid->hookflags = afb_hook_flags_evt(evtid->fullname); + evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf; + } + pthread_rwlock_unlock(&events_rwlock); +} +#endif + inline struct afb_evtid *afb_evt_event_x2_to_evtid(struct afb_event_x2 *eventid) { return (struct afb_evtid*)eventid; @@ -726,82 +1035,3 @@ struct afb_event_x2 *afb_evt_event_x2_addref(struct afb_event_x2 *eventid) return eventid; } -#if WITH_AFB_HOOK -/* - * Pushes the event 'evtid' with 'obj' to its listeners - * 'obj' is released (like json_object_put) - * Emits calls to hooks. - * Returns the count of listener taht received the event. - */ -int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj) -{ - - int result; - - /* lease the object */ - json_object_get(obj); - - /* hook before push */ - if (evtid->hookflags & afb_hook_flag_evt_push_before) - afb_hook_evt_push_before(evtid->fullname, evtid->id, obj); - - /* push */ - result = afb_evt_evtid_push(evtid, obj); - - /* hook after push */ - if (evtid->hookflags & afb_hook_flag_evt_push_after) - afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result); - - /* release the object */ - json_object_put(obj); - return result; -} - -/* - * increment the reference count of the event 'evtid' - */ -struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid) -{ - if (evtid->hookflags & afb_hook_flag_evt_addref) - afb_hook_evt_addref(evtid->fullname, evtid->id); - return afb_evt_evtid_addref(evtid); -} - -/* - * decrement the reference count of the event 'evtid' - * and destroy it when the count reachs zero - */ -void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid) -{ - if (evtid->hookflags & afb_hook_flag_evt_unref) - afb_hook_evt_unref(evtid->fullname, evtid->id); - afb_evt_evtid_unref(evtid); -} - -/* - * Returns the name associated to the event 'evtid'. - */ -const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid) -{ - const char *result = afb_evt_evtid_name(evtid); - if (evtid->hookflags & afb_hook_flag_evt_name) - afb_hook_evt_name(evtid->fullname, evtid->id, result); - return result; -} - -/* - * update the hooks for events - */ -void afb_evt_update_hooks() -{ - struct afb_evtid *evtid; - - pthread_rwlock_rdlock(&events_rwlock); - for (evtid = evtids ; evtid ; evtid = evtid->next) { - evtid->hookflags = afb_hook_flags_evt(evtid->fullname); - evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf; - } - pthread_rwlock_unlock(&events_rwlock); -} -#endif -