#include <errno.h>
#include <string.h>
#include <stdio.h>
+#include <pthread.h>
#include <json-c/json.h>
struct afb_ws *ws;
struct afb_wsj1_msg *messages;
struct wsj1_call *calls;
+ pthread_mutex_t mutex;
};
struct afb_wsj1 *afb_wsj1_create(struct sd_event *eloop, int fd, struct afb_wsj1_itf *itf, void *closure)
{
struct afb_wsj1 *result;
+ assert(eloop);
assert(fd >= 0);
+ assert(itf);
+ assert(itf->on_call);
result = calloc(1, sizeof * result);
if (result == NULL)
result->refcount = 1;
result->itf = itf;
result->closure = closure;
+ pthread_mutex_init(&result->mutex, NULL);
result->tokener = json_tokener_new();
if (result->tokener == NULL)
void afb_wsj1_addref(struct afb_wsj1 *wsj1)
{
- if (wsj1 != NULL)
- wsj1->refcount++;
+ if (wsj1)
+ __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
}
void afb_wsj1_unref(struct afb_wsj1 *wsj1)
{
- if (wsj1 != NULL && !--wsj1->refcount) {
+ if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
afb_ws_destroy(wsj1->ws);
json_tokener_free(wsj1->tokener);
free(wsj1);
}
-static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
{
struct wsj1_call *r, **p;
+
p = &wsj1->calls;
while((r = *p) != NULL) {
if (strcmp(r->id, id) == 0) {
}
p = &r->next;
}
+
+ return r;
+}
+
+static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+{
+ struct wsj1_call *r;
+
+ pthread_mutex_lock(&wsj1->mutex);
+ r = wsj1_locked_call_search(wsj1, id, remove);
+ pthread_mutex_unlock(&wsj1->mutex);
+
return r;
}
if (call == NULL)
errno = ENOMEM;
else {
+ pthread_mutex_lock(&wsj1->mutex);
do {
if (wsj1->genid == 0)
wsj1->genid = 999999;
sprintf(call->id, "%d", wsj1->genid--);
- } while (wsj1_call_search(wsj1, call->id, 0) != NULL);
+ } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
call->callback = on_reply;
call->closure = closure;
call->next = wsj1->calls;
wsj1->calls = call;
+ pthread_mutex_unlock(&wsj1->mutex);
}
return call;
}
msg->refcount = 1;
afb_wsj1_addref(wsj1);
msg->wsj1 = wsj1;
+ pthread_mutex_lock(&wsj1->mutex);
msg->next = wsj1->messages;
if (msg->next != NULL)
msg->next->previous = msg;
wsj1->messages = msg;
+ pthread_mutex_unlock(&wsj1->mutex);
/* incoke the handler */
switch (msg->code) {
free(call);
break;
case EVENT:
- wsj1->itf->on_event(wsj1->closure, msg->event, msg);
+ if (wsj1->itf->on_event != NULL)
+ wsj1->itf->on_event(wsj1->closure, msg->event, msg);
break;
}
afb_wsj1_msg_unref(msg);
void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
{
if (msg != NULL)
- msg->refcount++;
+ __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
}
void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
{
- if (msg != NULL && --msg->refcount == 0) {
+ if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the message */
+ pthread_mutex_lock(&msg->wsj1->mutex);
if (msg->next != NULL)
msg->next->previous = msg->previous;
if (msg->previous == NULL)
msg->wsj1->messages = msg->next;
else
msg->previous->next = msg->next;
+ pthread_mutex_unlock(&msg->wsj1->mutex);
/* free ressources */
afb_wsj1_unref(msg->wsj1);
json_object_put(msg->object_j);
{
struct json_object *object = msg->object_j;
if (object == NULL) {
+ pthread_mutex_lock(&msg->wsj1->mutex);
json_tokener_reset(msg->wsj1->tokener);
object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
+ pthread_mutex_unlock(&msg->wsj1->mutex);
if (object == NULL) {
/* lazy error detection of json request. Is it to improve? */
object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);