X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-evt.c;h=1c8798fdc39cb8858a407f91748552c19fc73ff3;hb=f40979c718fa6fe6b571e133e1bf19fc90957298;hp=46bcf6dbc426bc58bbbd70629c6aed35638ce3e7;hpb=5b5968815dc672467f40ed6b49f05a590bdb8b4d;p=src%2Fapp-framework-binder.git diff --git a/src/afb-evt.c b/src/afb-evt.c index 46bcf6db..1c8798fd 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -31,6 +31,7 @@ #include "afb-hook.h" #include "verbose.h" #include "jobs.h" +#include "uuid.h" struct afb_evt_watch; @@ -55,7 +56,7 @@ struct afb_evt_listener { pthread_rwlock_t rwlock; /* count of reference to the listener */ - int refcount; + uint16_t refcount; }; /* @@ -81,10 +82,10 @@ struct afb_evtid { #endif /* refcount */ - int refcount; + uint16_t refcount; /* id of the event */ - int id; + uint16_t id; /* has client? */ int has_client; @@ -122,6 +123,12 @@ struct job_broadcast /** object atached to the event */ struct json_object *object; + /** the uuid of the event */ + uuid_binary_t uuid; + + /** remaining hop */ + uint8_t hop; + /** name of the event to broadcast */ char event[]; }; @@ -169,19 +176,42 @@ static struct afb_evt_listener *listeners = NULL; /* handling id of events */ static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER; static struct afb_evtid *evtids = NULL; -static int event_id_counter = 0; -static int event_id_wrapped = 0; +static uint16_t event_genid = 0; +static uint16_t event_count = 0; + +/* head of uniqueness of events */ +#if !defined(EVENT_BROADCAST_HOP_MAX) +# define EVENT_BROADCAST_HOP_MAX 10 +#endif +#if !defined(EVENT_BROADCAST_MEMORY_COUNT) +# define EVENT_BROADCAST_MEMORY_COUNT 8 +#endif + +#if EVENT_BROADCAST_MEMORY_COUNT +static struct { + pthread_mutex_t mutex; + uint8_t base; + uint8_t count; + uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT]; +} uniqueness = { + .mutex = PTHREAD_MUTEX_INITIALIZER, + .base = 0, + .count = 0 +}; +#endif /* * Create structure for job of broadcasting string 'event' with 'object' * Returns the created structure or NULL if out of memory */ -static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object) +static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { size_t sz = 1 + strlen(event); struct job_broadcast *jb = malloc(sz + sizeof *jb); if (jb) { jb->object = object; + memcpy(jb->uuid, uuid, sizeof jb->uuid); + jb->hop = hop; memcpy(jb->event, event, sz); } return jb; @@ -223,7 +253,7 @@ static void destroy_job_evtid(struct job_evtid *je) /* * Broadcasts the 'event' of 'id' with its 'object' */ -static void broadcast(const char *event, struct json_object *object) +static void broadcast(struct job_broadcast *jb) { struct afb_evt_listener *listener; @@ -231,7 +261,7 @@ static void broadcast(const char *event, struct json_object *object) listener = listeners; while(listener) { if (listener->itf->broadcast != NULL) - listener->itf->broadcast(listener->closure, event, json_object_get(object)); + listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop); listener = listener->next; } pthread_rwlock_unlock(&listeners_rwlock); @@ -245,19 +275,56 @@ static void broadcast_job(int signum, void *closure) struct job_broadcast *jb = closure; if (signum == 0) - broadcast(jb->event, jb->object); + broadcast(jb); destroy_job_broadcast(jb); } /* * Broadcasts the string 'event' with its 'object' */ -static int unhooked_broadcast(const char *event, struct json_object *object) +static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { + uuid_binary_t local_uuid; struct job_broadcast *jb; int rc; +#if EVENT_BROADCAST_MEMORY_COUNT + int iter, count; +#endif + + /* check if lately sent */ + if (!uuid) { + uuid_new_binary(local_uuid); + uuid = local_uuid; + hop = EVENT_BROADCAST_HOP_MAX; +#if EVENT_BROADCAST_MEMORY_COUNT + pthread_mutex_lock(&uniqueness.mutex); + } else { + pthread_mutex_lock(&uniqueness.mutex); + iter = (int)uniqueness.base; + count = (int)uniqueness.count; + while (count) { + if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) { + pthread_mutex_unlock(&uniqueness.mutex); + return 0; + } + if (++iter == EVENT_BROADCAST_MEMORY_COUNT) + iter = 0; + count--; + } + } + iter = (int)uniqueness.base; + if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT) + iter += (int)(uniqueness.count++); + else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT) + uniqueness.base = 0; + memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t)); + pthread_mutex_unlock(&uniqueness.mutex); +#else + } +#endif - jb = make_job_broadcast(event, object); + /* create the structure for the job */ + jb = make_job_broadcast(event, object, uuid, hop); if (jb == NULL) { ERROR("Cant't create broadcast string job item for %s(%s)", event, json_object_to_json_string(object)); @@ -265,6 +332,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object) return -1; } + /* queue the job */ rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb); if (rc) { ERROR("cant't queue broadcast string job item for %s(%s)", @@ -281,7 +349,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object) */ int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object) { - return unhooked_broadcast(evtid->fullname, object); + return unhooked_broadcast(evtid->fullname, object, NULL, 0); } #if WITH_AFB_HOOK @@ -310,12 +378,7 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object * } #endif -/* - * Broadcasts the 'event' with its 'object' - * 'object' is released (like json_object_put) - * Returns the count of listener having receive the event. - */ -int afb_evt_broadcast(const char *event, struct json_object *object) +int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { int result; @@ -324,7 +387,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object) afb_hook_evt_broadcast_before(event, 0, object); #endif - result = unhooked_broadcast(event, object); + result = unhooked_broadcast(event, object, uuid, hop); #if WITH_AFB_HOOK afb_hook_evt_broadcast_after(event, 0, object, result); @@ -333,6 +396,16 @@ int afb_evt_broadcast(const char *event, struct json_object *object) return result; } +/* + * Broadcasts the 'event' with its 'object' + * 'object' is released (like json_object_put) + * Returns the count of listener having receive the event. + */ +int afb_evt_broadcast(const char *event, struct json_object *object) +{ + return afb_evt_rebroadcast(event, object, NULL, 0); +} + /* * Pushes the event 'evtid' with 'obj' to its listeners * Returns the count of listener that received the event. @@ -472,6 +545,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) { size_t len; struct afb_evtid *evtid, *oevt; + uint16_t id; /* allocates the event */ len = strlen(fullname); @@ -481,15 +555,20 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) /* allocates the id */ pthread_rwlock_wrlock(&events_rwlock); + if (event_count == UINT16_MAX) { + pthread_rwlock_unlock(&events_rwlock); + free(evtid); + ERROR("Can't create more events"); + return NULL; + } + event_count++; do { - if (++event_id_counter < 0) { - event_id_wrapped = 1; - event_id_counter = 1024; /* heuristic: small numbers are not destroyed */ - } - if (!event_id_wrapped) - break; + /* TODO add a guard (counting number of event created) */ + id = ++event_genid; + if (!id) + id = event_genid = 1; oevt = evtids; - while(oevt != NULL && oevt->id != event_id_counter) + while(oevt != NULL && oevt->id != id) oevt = oevt->next; } while (oevt != NULL); @@ -498,7 +577,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname) evtid->next = evtids; evtid->refcount = 1; evtid->watchs = NULL; - evtid->id = event_id_counter; + evtid->id = id; evtid->has_client = 0; pthread_rwlock_init(&evtid->rwlock, NULL); evtids = evtid; @@ -576,8 +655,10 @@ void afb_evt_evtid_unref(struct afb_evtid *evtid) prv = &evtids; while (*prv && !(found = (*prv == evtid))) prv = &(*prv)->next; - if (found) + if (found) { *prv = evtid->next; + event_count--; + } pthread_rwlock_unlock(&events_rwlock); /* destroys the event */ @@ -647,7 +728,7 @@ const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid) /* * Returns the id of the 'event' */ -int afb_evt_evtid_id(struct afb_evtid *evtid) +uint16_t afb_evt_evtid_id(struct afb_evtid *evtid) { return evtid->id; } @@ -813,6 +894,36 @@ int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid return -1; } +/* + * Avoids the 'listener' to watch 'eventid' + * Returns 0 in case of success or else -1. + */ +int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid) +{ + struct afb_evt_watch *watch; + struct afb_evtid *evtid; + + /* search the existing watch */ + pthread_rwlock_wrlock(&listener->rwlock); + watch = listener->watchs; + while(watch != NULL) { + evtid = watch->evtid; + if (evtid->id == eventid) { + if (watch->activity != 0) { + watch->activity--; + if (watch->activity == 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evtid->fullname, evtid->id); + } + pthread_rwlock_unlock(&listener->rwlock); + return 0; + } + watch = watch->next_by_listener; + } + pthread_rwlock_unlock(&listener->rwlock); + errno = ENOENT; + return -1; +} + #if WITH_AFB_HOOK /* * update the hooks for events @@ -870,7 +981,7 @@ const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid) /* * Returns the id of the 'eventid' */ -int afb_evt_event_x2_id(struct afb_event_x2 *eventid) +uint16_t afb_evt_event_x2_id(struct afb_event_x2 *eventid) { struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid); return evtid ? evtid->id : 0;