From ca9807f73646f536ac58c002d963a8bb8d245f5d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Mon, 10 Apr 2017 21:39:22 +0200 Subject: [PATCH] Make implementation multithread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This changes makes many improvement needed for multi-threading: - json object can't be shared across threads because get/set is not protected - event are now multithread compatible Change-Id: Id44b12c68e0fa67042b8ea44939af4edfa76270a Signed-off-by: José Bollo --- src/afb-evt.c | 124 ++++++++++++++++++++++++++++++++++++++--------------- src/afb-msg-json.c | 14 +++--- src/afb-ws.c | 2 +- src/sig-monitor.c | 2 + 4 files changed, 98 insertions(+), 44 deletions(-) diff --git a/src/afb-evt.c b/src/afb-evt.c index 8ae31cc5..2abbc819 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -47,6 +48,9 @@ struct afb_evt_listener { /* head of the list of events listened */ struct afb_evt_watch *watchs; + /* mutex of the listener */ + pthread_mutex_t mutex; + /* count of reference to the listener */ int refcount; }; @@ -65,6 +69,9 @@ struct afb_evt_event { /* id of the event */ int id; + /* mutex of the event */ + pthread_mutex_t mutex; + /* name of the event */ char name[1]; }; @@ -105,9 +112,11 @@ static struct afb_event_itf afb_evt_event_itf = { }; /* head of the list of listeners */ +static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER; static struct afb_evt_listener *listeners = NULL; /* handling id of events */ +static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER; static struct afb_evt_event *events = NULL; static int event_id_counter = 0; static int event_id_wrapped = 0; @@ -133,6 +142,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object) struct afb_evt_listener *listener; result = 0; + pthread_mutex_lock(&listeners_mutex); listener = listeners; while(listener) { if (listener->itf->broadcast != NULL) { @@ -141,6 +151,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object) } listener = listener->next; } + pthread_mutex_unlock(&listeners_mutex); json_object_put(object); return result; } @@ -157,6 +168,7 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj) struct afb_evt_listener *listener; result = 0; + pthread_mutex_lock(&evt->mutex); watch = evt->watchs; while(watch) { listener = watch->listener; @@ -166,6 +178,7 @@ static int evt_push(struct afb_evt_event *evt, struct json_object *obj) watch = watch->next_by_event; result++; } + pthread_mutex_unlock(&evt->mutex); json_object_put(obj); return result; } @@ -214,25 +227,35 @@ static void remove_watch(struct afb_evt_watch *watch) */ static void evt_destroy(struct afb_evt_event *evt) { + int found; struct afb_evt_event **prv; + struct afb_evt_listener *listener; + if (evt != NULL) { - /* removes the event if valid! */ + /* unlinks the event if valid! */ + pthread_mutex_lock(&events_mutex); prv = &events; - while (*prv != NULL) { - if (*prv != evt) - prv = &(*prv)->next; - else { - /* valid, unlink */ - *prv = evt->next; - - /* removes all watchers */ - while(evt->watchs != NULL) - remove_watch(evt->watchs); - - /* free */ - free(evt); - break; + while (*prv && !(found = (*prv == evt))) + prv = &(*prv)->next; + if (found) + *prv = evt->next; + pthread_mutex_unlock(&events_mutex); + + /* destroys the event */ + if (found) { + /* removes all watchers */ + while(evt->watchs != NULL) { + listener = evt->watchs->listener; + pthread_mutex_lock(&listener->mutex); + pthread_mutex_lock(&evt->mutex); + remove_watch(evt->watchs); + pthread_mutex_unlock(&evt->mutex); + pthread_mutex_unlock(&listener->mutex); } + + /* free */ + pthread_mutex_destroy(&evt->mutex); + free(evt); } } } @@ -246,7 +269,18 @@ struct afb_event afb_evt_create_event(const char *name) size_t len; struct afb_evt_event *evt; + /* allocates the event */ + len = strlen(name); + evt = malloc(len + sizeof * evt); + if (evt == NULL) + goto error; + + /* initialize the event */ + evt->watchs = NULL; + memcpy(evt->name, name, len + 1); + /* allocates the id */ + pthread_mutex_lock(&events_mutex); do { if (++event_id_counter < 0) { event_id_wrapped = 1; @@ -259,19 +293,14 @@ struct afb_event afb_evt_create_event(const char *name) evt = evt->next; } while (evt != NULL); - /* allocates the event */ - len = strlen(name); - evt = malloc(len + sizeof * evt); - if (evt == NULL) - goto error; - /* initialize the event */ + memcpy(evt->name, name, len + 1); evt->next = events; evt->watchs = NULL; evt->id = event_id_counter; - assert(evt->id > 0); - memcpy(evt->name, name, len + 1); + pthread_mutex_init(&evt->mutex, NULL); events = evt; + pthread_mutex_unlock(&events_mutex); /* returns the event */ return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt }; @@ -305,10 +334,13 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, struct afb_evt_listener *listener; /* search if an instance already exists */ + pthread_mutex_lock(&listeners_mutex); listener = listeners; while (listener != NULL) { - if (listener->itf == itf && listener->closure == closure) - return afb_evt_listener_addref(listener); + if (listener->itf == itf && listener->closure == closure) { + listener = afb_evt_listener_addref(listener); + goto found; + } listener = listener->next; } @@ -316,13 +348,16 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, listener = calloc(1, sizeof *listener); if (listener != NULL) { /* init */ - listener->next = listeners; listener->itf = itf; listener->closure = closure; listener->watchs = NULL; listener->refcount = 1; + pthread_mutex_init(&listener->mutex, NULL); + listener->next = listeners; listeners = listener; } + found: + pthread_mutex_unlock(&listeners_mutex); return listener; } @@ -331,7 +366,7 @@ struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, */ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener) { - listener->refcount++; + __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED); return listener; } @@ -341,20 +376,31 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen */ void afb_evt_listener_unref(struct afb_evt_listener *listener) { - if (0 == --listener->refcount) { - struct afb_evt_listener **prv; + struct afb_evt_listener **prv; + struct afb_evt_event *evt; - /* remove the watchers */ - while (listener->watchs != NULL) - remove_watch(listener->watchs); + if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) { /* unlink the listener */ + pthread_mutex_lock(&listeners_mutex); prv = &listeners; while (*prv != listener) prv = &(*prv)->next; *prv = listener->next; + pthread_mutex_unlock(&listeners_mutex); + + /* remove the watchers */ + pthread_mutex_lock(&listener->mutex); + while (listener->watchs != NULL) { + evt = listener->watchs->event; + pthread_mutex_lock(&evt->mutex); + remove_watch(listener->watchs); + pthread_mutex_unlock(&evt->mutex); + } + pthread_mutex_unlock(&listener->mutex); /* free the listener */ + pthread_mutex_destroy(&listener->mutex); free(listener); } } @@ -376,6 +422,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) /* search the existing watch for the listener */ evt = event.closure; + pthread_mutex_lock(&listener->mutex); watch = listener->watchs; while(watch != NULL) { if (watch->event == evt) @@ -386,23 +433,27 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) /* not found, allocate a new */ watch = malloc(sizeof *watch); if (watch == NULL) { + pthread_mutex_unlock(&listener->mutex); errno = ENOMEM; return -1; } /* initialise and link */ watch->event = evt; - watch->next_by_event = evt->watchs; + watch->activity = 0; watch->listener = listener; watch->next_by_listener = listener->watchs; - watch->activity = 0; - evt->watchs = watch; listener->watchs = watch; + pthread_mutex_lock(&evt->mutex); + watch->next_by_event = evt->watchs; + evt->watchs = watch; + pthread_mutex_unlock(&evt->mutex); found: if (watch->activity == 0 && listener->itf->add != NULL) listener->itf->add(listener->closure, evt->name, evt->id); watch->activity++; + pthread_mutex_unlock(&listener->mutex); return 0; } @@ -424,6 +475,7 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve /* search the existing watch */ evt = event.closure; + pthread_mutex_lock(&listener->mutex); watch = listener->watchs; while(watch != NULL) { if (watch->event == evt) { @@ -433,10 +485,12 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve if (watch->activity == 0 && listener->itf->remove != NULL) listener->itf->remove(listener->closure, evt->name, evt->id); } + pthread_mutex_unlock(&listener->mutex); return 0; } watch = watch->next_by_listener; } + pthread_mutex_unlock(&listener->mutex); errno = ENOENT; return -1; } diff --git a/src/afb-msg-json.c b/src/afb-msg-json.c index b4ae51b4..6d8f7327 100644 --- a/src/afb-msg-json.c +++ b/src/afb-msg-json.c @@ -29,15 +29,14 @@ struct json_object *afb_msg_json_reply(const char *status, const char *info, str { json_object *msg, *request; const char *token, *uuid; - static json_object *type_reply = NULL; + json_object *type_reply = NULL; msg = json_object_new_object(); if (resp != NULL) json_object_object_add(msg, "response", resp); - if (type_reply == NULL) - type_reply = json_object_new_string("afb-reply"); - json_object_object_add(msg, "jtype", json_object_get(type_reply)); + type_reply = json_object_new_string("afb-reply"); + json_object_object_add(msg, "jtype", type_reply); request = json_object_new_object(); json_object_object_add(msg, "request", request); @@ -75,7 +74,7 @@ struct json_object *afb_msg_json_reply_error(const char *status, const char *inf struct json_object *afb_msg_json_event(const char *event, struct json_object *object) { json_object *msg; - static json_object *type_event = NULL; + json_object *type_event = NULL; msg = json_object_new_object(); @@ -84,9 +83,8 @@ struct json_object *afb_msg_json_event(const char *event, struct json_object *ob if (object != NULL) json_object_object_add(msg, "data", object); - if (type_event == NULL) - type_event = json_object_new_string("afb-event"); - json_object_object_add(msg, "jtype", json_object_get(type_event)); + type_event = json_object_new_string("afb-event"); + json_object_object_add(msg, "jtype", type_event); return msg; } diff --git a/src/afb-ws.c b/src/afb-ws.c index cc852b20..c6126100 100644 --- a/src/afb-ws.c +++ b/src/afb-ws.c @@ -407,7 +407,7 @@ static int aws_read(struct afb_ws *ws, size_t size) return 0; pfd.fd = ws->fd; pfd.events = POLLIN; - poll(&pfd, 1, 10); + poll(&pfd, 1, 10); /* TODO: make fully asynchronous websockets */ } else { ws->buffer.size += (size_t)sz; size -= (size_t)sz; diff --git a/src/sig-monitor.c b/src/sig-monitor.c index d00f0f97..89fd4444 100644 --- a/src/sig-monitor.c +++ b/src/sig-monitor.c @@ -115,6 +115,8 @@ static void on_signal_error(int signum) { sigset_t sigset; + ERROR("ALERT! signal %d received: %s", signum, strsignal(signum)); + // unlock signal to allow a new signal to come if (error_handler != NULL) { sigemptyset(&sigset); -- 2.16.6