Update date of copyright notices
[src/app-framework-binder.git] / src / afb-proto-ws.c
index 90c3b05..e1993ab 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
+ * Copyright (C) 2015-2018 "IoT.bzh"
  * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -37,6 +37,8 @@
 #include "afb-ws.h"
 #include "afb-msg-json.h"
 #include "afb-proto-ws.h"
+#include "jobs.h"
+#include "fdev.h"
 
 struct afb_proto_ws;
 
@@ -160,7 +162,7 @@ struct afb_proto_ws
        int refcount;
 
        /* file descriptor */
-       int fd;
+       struct fdev *fdev;
 
        /* resource control */
        pthread_mutex_t mutex;
@@ -190,6 +192,27 @@ struct afb_proto_ws
        void (*on_hangup)(void *closure);
 };
 
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX  32
+struct writebuf
+{
+       struct iovec iovec[WRITEBUF_COUNT_MAX];
+       uint32_t uints[WRITEBUF_COUNT_MAX];
+       int count;
+};
+
+struct readbuf
+{
+       char *base, *head, *end;
+};
+
+struct binary
+{
+       struct afb_proto_ws *protows;
+       struct readbuf rb;
+};
+
 /******************* common useful tools **********************************/
 
 /**
@@ -204,19 +227,6 @@ static inline uint32_t ptr2id(void *ptr)
 
 /******************* serialisation part **********************************/
 
-struct readbuf
-{
-       char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX  32
-struct writebuf
-{
-       struct iovec iovec[WRITEBUF_COUNT_MAX];
-       uint32_t uints[WRITEBUF_COUNT_MAX];
-       int count;
-};
-
 static char *readbuf_get(struct readbuf *rb, uint32_t length)
 {
        char *before = rb->head;
@@ -735,54 +745,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
 }
 
 /* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_ANSWER_SUCCESS: /* success */
-                       client_on_reply_success(protows, &rb);
+                       client_on_reply_success(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_ANSWER_FAIL: /* fail */
-                       client_on_reply_fail(protows, &rb);
+                       client_on_reply_fail(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_BROADCAST: /* broadcast */
-                       client_on_event_broadcast(protows, &rb);
+                       client_on_event_broadcast(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_ADD: /* creates the event */
-                       client_on_event_create(protows, &rb);
+                       client_on_event_create(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_DEL: /* removes the event */
-                       client_on_event_remove(protows, &rb);
+                       client_on_event_remove(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_PUSH: /* pushs the event */
-                       client_on_event_push(protows, &rb);
+                       client_on_event_push(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
-                       client_on_event_subscribe(protows, &rb);
+                       client_on_event_subscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
-                       client_on_event_unsubscribe(protows, &rb);
+                       client_on_event_unsubscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_CALL: /* subcall */
-                       client_on_subcall(protows, &rb);
+                       client_on_subcall(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIPTION: /* description */
-                       client_on_description(protows, &rb);
+                       client_on_description(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 int afb_proto_ws_client_call(
@@ -1017,33 +1046,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
 }
 
 /* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_CALL:
-                       server_on_call(protows, &rb);
+                       server_on_call(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_REPLY:
-                       server_on_subcall_reply(protows, &rb);
+                       server_on_subcall_reply(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIBE:
-                       server_on_describe(protows, &rb);
+                       server_on_describe(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 /******************* server part: manage events **********************************/
@@ -1111,9 +1158,9 @@ static void on_hangup(void *closure)
                free(cd);
        }
 
-       if (protows->fd >= 0) {
-               close(protows->fd);
-               protows->fd = -1;
+       if (protows->fdev) {
+               fdev_unref(protows->fdev);
+               protows->fdev = 0;
                if (protows->on_hangup)
                        protows->on_hangup(protows->closure);
        }
@@ -1141,7 +1188,7 @@ static const struct afb_ws_itf server_ws_itf =
 
 /*****************************************************/
 
-static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
+static struct afb_proto_ws *afb_proto_ws_create(struct fdev *fdev, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
 {
        struct afb_proto_ws *protows;
 
@@ -1149,11 +1196,11 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd,
        if (protows == NULL)
                errno = ENOMEM;
        else {
-               fcntl(fd, F_SETFD, FD_CLOEXEC);
-               fcntl(fd, F_SETFL, O_NONBLOCK);
-               protows->ws = afb_ws_create(eloop, fd, itf, protows);
+               fcntl(fdev_fd(fdev), F_SETFD, FD_CLOEXEC);
+               fcntl(fdev_fd(fdev), F_SETFL, O_NONBLOCK);
+               protows->ws = afb_ws_create(fdev, itf, protows);
                if (protows->ws != NULL) {
-                       protows->fd = fd;
+                       protows->fdev = fdev;
                        protows->refcount = 1;
                        protows->subcalls = NULL;
                        protows->closure = closure;
@@ -1167,14 +1214,14 @@ static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd,
        return NULL;
 }
 
-struct afb_proto_ws *afb_proto_ws_create_client(struct sd_event *eloop, int fd, const struct afb_proto_ws_client_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure)
 {
-       return afb_proto_ws_create(eloop, fd, NULL, itf, closure, &proto_ws_client_ws_itf);
+       return afb_proto_ws_create(fdev, NULL, itf, closure, &proto_ws_client_ws_itf);
 }
 
-struct afb_proto_ws *afb_proto_ws_create_server(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_server(struct fdev *fdev, const struct afb_proto_ws_server_itf *itf, void *closure)
 {
-       return afb_proto_ws_create(eloop, fd, itf, NULL, closure, &server_ws_itf);
+       return afb_proto_ws_create(fdev, itf, NULL, closure, &server_ws_itf);
 }
 
 void afb_proto_ws_unref(struct afb_proto_ws *protows)