#include "afb-ws.h"
#include "afb-msg-json.h"
#include "afb-proto-ws.h"
+#include "jobs.h"
+#include "fdev.h"
struct afb_proto_ws;
int refcount;
/* file descriptor */
- int fd;
+ struct fdev *fdev;
/* resource control */
pthread_mutex_t mutex;
void (*on_hangup)(void *closure);
};
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX 32
+struct writebuf
+{
+ struct iovec iovec[WRITEBUF_COUNT_MAX];
+ uint32_t uints[WRITEBUF_COUNT_MAX];
+ int count;
+};
+
+struct readbuf
+{
+ char *base, *head, *end;
+};
+
+struct binary
+{
+ struct afb_proto_ws *protows;
+ struct readbuf rb;
+};
+
/******************* common useful tools **********************************/
/**
/******************* serialisation part **********************************/
-struct readbuf
-{
- char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX 32
-struct writebuf
-{
- struct iovec iovec[WRITEBUF_COUNT_MAX];
- uint32_t uints[WRITEBUF_COUNT_MAX];
- int count;
-};
-
static char *readbuf_get(struct readbuf *rb, uint32_t length)
{
char *before = rb->head;
}
/* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
{
- struct afb_proto_ws *protows;
- struct readbuf rb;
-
- rb.base = data;
- if (size > 0) {
- rb.head = data;
- rb.end = data + size;
- protows = closure;
+ struct binary *binary = closure;
- switch (*rb.head++) {
+ if (!sig) {
+ switch (*binary->rb.head++) {
case CHAR_FOR_ANSWER_SUCCESS: /* success */
- client_on_reply_success(protows, &rb);
+ client_on_reply_success(binary->protows, &binary->rb);
break;
case CHAR_FOR_ANSWER_FAIL: /* fail */
- client_on_reply_fail(protows, &rb);
+ client_on_reply_fail(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- client_on_event_broadcast(protows, &rb);
+ client_on_event_broadcast(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_ADD: /* creates the event */
- client_on_event_create(protows, &rb);
+ client_on_event_create(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_DEL: /* removes the event */
- client_on_event_remove(protows, &rb);
+ client_on_event_remove(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_PUSH: /* pushs the event */
- client_on_event_push(protows, &rb);
+ client_on_event_push(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- client_on_event_subscribe(protows, &rb);
+ client_on_event_subscribe(binary->protows, &binary->rb);
break;
case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- client_on_event_unsubscribe(protows, &rb);
+ client_on_event_unsubscribe(binary->protows, &binary->rb);
break;
case CHAR_FOR_SUBCALL_CALL: /* subcall */
- client_on_subcall(protows, &rb);
+ client_on_subcall(binary->protows, &binary->rb);
break;
case CHAR_FOR_DESCRIPTION: /* description */
- client_on_description(protows, &rb);
+ client_on_description(binary->protows, &binary->rb);
break;
default: /* unexpected message */
/* TODO: close the connection */
break;
}
}
- free(rb.base);
+ free(binary->rb.base);
+ free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+ int rc;
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ errno = ENOMEM;
+ } else {
+ binary->protows = closure;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+ if (rc >= 0)
+ return;
+ free(binary);
+ }
+ }
+ free(data);
}
int afb_proto_ws_client_call(
}
/* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
{
- struct afb_proto_ws *protows;
- struct readbuf rb;
-
- rb.base = data;
- if (size > 0) {
- rb.head = data;
- rb.end = data + size;
- protows = closure;
+ struct binary *binary = closure;
- switch (*rb.head++) {
+ if (!sig) {
+ switch (*binary->rb.head++) {
case CHAR_FOR_CALL:
- server_on_call(protows, &rb);
+ server_on_call(binary->protows, &binary->rb);
break;
case CHAR_FOR_SUBCALL_REPLY:
- server_on_subcall_reply(protows, &rb);
+ server_on_subcall_reply(binary->protows, &binary->rb);
break;
case CHAR_FOR_DESCRIBE:
- server_on_describe(protows, &rb);
+ server_on_describe(binary->protows, &binary->rb);
break;
default: /* unexpected message */
/* TODO: close the connection */
break;
}
}
- free(rb.base);
+ free(binary->rb.base);
+ free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+ int rc;
+ struct binary *binary;
+
+ if (size) {
+ binary = malloc(sizeof *binary);
+ if (!binary) {
+ errno = ENOMEM;
+ } else {
+ binary->protows = closure;
+ binary->rb.base = data;
+ binary->rb.head = data;
+ binary->rb.end = data + size;
+ rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+ if (rc >= 0)
+ return;
+ free(binary);
+ }
+ }
+ free(data);
}
/******************* server part: manage events **********************************/
free(cd);
}
- if (protows->fd >= 0) {
- close(protows->fd);
- protows->fd = -1;
+ if (protows->fdev) {
+ fdev_unref(protows->fdev);
+ protows->fdev = 0;
if (protows->on_hangup)
protows->on_hangup(protows->closure);
}
/*****************************************************/
-static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
+static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
{
struct afb_proto_ws *protows;
if (protows == NULL)
errno = ENOMEM;
else {
- fcntl(fd, F_SETFD, FD_CLOEXEC);
- fcntl(fd, F_SETFL, O_NONBLOCK);
- protows->ws = afb_ws_create(eloop, fd, itf, protows);
+ fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC);
+ fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK);
+ protows->ws = afb_ws_create(fdev, itf, protows);
if (protows->ws != NULL) {
- protows->fd = fd;
+ protows->fdev = fdev;
protows->refcount = 1;
protows->subcalls = NULL;
protows->closure = closure;
return NULL;
}
-struct afb_proto_ws *afb_proto_ws_create_client(struct sd_event *eloop, int fd, const struct afb_proto_ws_client_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
{
- return afb_proto_ws_create(eloop, fd, NULL, itf, closure, &proto_ws_client_ws_itf);
+ return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
}
-struct afb_proto_ws *afb_proto_ws_create_server(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)
{
- return afb_proto_ws_create(eloop, fd, itf, NULL, closure, &server_ws_itf);
+ return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf);
}
void afb_proto_ws_unref(struct afb_proto_ws *protows)