From 5eaa2c12a8b89f8a16f0759db88d65b56c82918c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Thu, 21 Apr 2016 17:45:44 +0200 Subject: [PATCH] websocket refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: Ia39ec6c01ce1fc6b3921b0433ab872d47ebdbbc4 Signed-off-by: José Bollo --- src/afb-ws.c | 320 +++++++++++++++++++++++++++++++++++++++++++--------------- src/afb-ws.h | 13 ++- src/websock.c | 21 +++- src/websock.h | 11 +- 4 files changed, 273 insertions(+), 92 deletions(-) diff --git a/src/afb-ws.c b/src/afb-ws.c index 2ebfbb93..da248c89 100644 --- a/src/afb-ws.c +++ b/src/afb-ws.c @@ -29,6 +29,9 @@ #include "utils-upoll.h" +/* + * declaration of the websock interface for afb-ws + */ static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt); static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt); static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size); @@ -36,7 +39,7 @@ static void aws_on_text(struct afb_ws *ws, int last, size_t size); static void aws_on_binary(struct afb_ws *ws, int last, size_t size); static void aws_on_continue(struct afb_ws *ws, int last, size_t size); static void aws_on_readable(struct afb_ws *ws); -static void aws_on_hangup(struct afb_ws *ws); +static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size); static struct websock_itf aws_itf = { .writev = (void*)aws_writev, @@ -48,54 +51,113 @@ static struct websock_itf aws_itf = { .on_text = (void*)aws_on_text, .on_binary = (void*)aws_on_binary, .on_continue = (void*)aws_on_continue, - .on_extension = NULL + .on_extension = NULL, + + .on_error = (void*)aws_on_error }; +/* + * a common scheme of buffer handling + */ struct buf { char *buffer; size_t size; }; +/* + * the state + */ +enum state +{ + waiting, + reading_text, + reading_binary +}; + +/* + * the afb_ws structure + */ struct afb_ws { - int fd; - enum { none, text, binary } type; - const struct afb_ws_itf *itf; - void *closure; - struct websock *ws; - struct upoll *up; - struct buf buffer; + int fd; /* the socket file descriptor */ + enum state state; /* current state */ + const struct afb_ws_itf *itf; /* the callback interface */ + void *closure; /* closure when calling the callbacks */ + struct websock *ws; /* the websock handler */ + struct upoll *up; /* the upoll handler for the socket */ + struct buf buffer; /* the last read fragment */ }; +/* + * Returns the current buffer of 'ws' that is reset. + */ +static inline struct buf aws_pick_buffer(struct afb_ws *ws) +{ + struct buf result = ws->buffer; + ws->buffer.buffer = NULL; + ws->buffer.size = 0; + return result; +} + +/* + * Disconnect the websocket 'ws' and calls on_hangup if + * 'call_on_hangup' is not null. + */ +static void aws_disconnect(struct afb_ws *ws, int call_on_hangup) +{ + struct websock *wsi = ws->ws; + if (wsi != NULL) { + ws->ws = NULL; + upoll_close(ws->up); + ws->up = NULL; + websock_destroy(wsi); + free(aws_pick_buffer(ws).buffer); + ws->state = waiting; + if (call_on_hangup && ws->itf->on_hangup) + ws->itf->on_hangup(ws->closure); + } +} + +/* + * Creates the afb_ws structure for the file descritor + * 'fd' and the callbacks described by the interface 'itf' + * and its 'closure'. + * + * Returns the handle for the afb_ws created or NULL on error. + */ struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure) { struct afb_ws *result; assert(fd >= 0); + /* allocation */ result = malloc(sizeof * result); if (result == NULL) goto error; + /* init */ result->fd = fd; - result->type = none; + result->state = waiting; result->itf = itf; result->closure = closure; + result->buffer.buffer = NULL; + result->buffer.size = 0; + /* creates the websocket */ result->ws = websock_create_v13(&aws_itf, result); if (result->ws == NULL) goto error2; + /* creates the upoll */ result->up = upoll_open(result->fd, result); if (result->up == NULL) goto error3; - result->buffer.buffer = NULL; - result->buffer.size = 0; - + /* init the upoll */ upoll_on_readable(result->up, (void*)aws_on_readable); - upoll_on_hangup(result->up, (void*)aws_on_hangup); + upoll_on_hangup(result->up, (void*)afb_ws_hangup); return result; @@ -104,35 +166,92 @@ error3: error2: free(result); error: - close(fd); return NULL; } -void afb_ws_disconnect(struct afb_ws *ws) +/* + * Destroys the websocket 'ws' + * It first hangup (but without calling on_hangup for safety reasons) + * if needed. + */ +void afb_ws_destroy(struct afb_ws *ws) { - struct upoll *up = ws->up; - struct websock *wsi = ws->ws; - ws->up = NULL; - ws->ws = NULL; - upoll_close(up); - websock_destroy(wsi); + aws_disconnect(ws, 0); + free(ws); } -void afb_ws_close(struct afb_ws *ws, uint16_t code) +/* + * Hangup the websocket 'ws' + */ +void afb_ws_hangup(struct afb_ws *ws) +{ + aws_disconnect(ws, 1); +} + +/* + * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the + * 'reason' (that can be NULL and that else should not be greater than 123 + * characters). + * Returns 0 on success or -1 in case of error. + */ +int afb_ws_close(struct afb_ws *ws, uint16_t code, const char *reason) { - websock_close_code(ws->ws, code, NULL, 0); + if (ws->ws == NULL) { + /* disconnected */ + errno = EPIPE; + return -1; + } + return websock_close(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason)); } -void afb_ws_text(struct afb_ws *ws, const char *text, size_t length) +/* + * Sends a 'close' command to the endpoint of 'ws' with the 'code' and the + * 'reason' (that can be NULL and that else should not be greater than 123 + * characters). + * Raise an error after 'close' command is sent. + * Returns 0 on success or -1 in case of error. + */ +int afb_ws_error(struct afb_ws *ws, uint16_t code, const char *reason) { - websock_text(ws->ws, 1, text, length); + if (ws->ws == NULL) { + /* disconnected */ + errno = EPIPE; + return -1; + } + return websock_error(ws->ws, code, reason, reason == NULL ? 0 : strlen(reason)); } -void afb_ws_binary(struct afb_ws *ws, const void *data, size_t length) +/* + * Sends a 'text' of 'length' to the endpoint of 'ws'. + * Returns 0 on success or -1 in case of error. + */ +int afb_ws_text(struct afb_ws *ws, const char *text, size_t length) { - websock_binary(ws->ws, 1, data, length); + if (ws->ws == NULL) { + /* disconnected */ + errno = EPIPE; + return -1; + } + return websock_text(ws->ws, 1, text, length); } +/* + * Sends a binary 'data' of 'length' to the endpoint of 'ws'. + * Returns 0 on success or -1 in case of error. + */ +int afb_ws_binary(struct afb_ws *ws, const void *data, size_t length) +{ + if (ws->ws == NULL) { + /* disconnected */ + errno = EPIPE; + return -1; + } + return websock_binary(ws->ws, 1, data, length); +} + +/* + * callback for writing data + */ static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt) { ssize_t rc; @@ -142,6 +261,9 @@ static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt return rc; } +/* + * callback for reading data + */ static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt) { ssize_t rc; @@ -155,25 +277,24 @@ static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt) return rc; } +/* + * callback on incoming data + */ static void aws_on_readable(struct afb_ws *ws) { - if (websock_dispatch(ws->ws) < 0 && errno == EPIPE) - afb_ws_disconnect(ws); -} + int rc; -static void aws_on_hangup(struct afb_ws *ws) -{ - afb_ws_disconnect(ws); -} - -static inline struct buf aws_pick_buffer(struct afb_ws *ws) -{ - struct buf result = ws->buffer; - ws->buffer.buffer = NULL; - ws->buffer.size = 0; - return result; + assert(ws->ws != NULL); + rc = websock_dispatch(ws->ws); + if (rc < 0 && errno == EPIPE) + afb_ws_hangup(ws); } +/* + * Reads from the websocket handled by 'ws' data of length 'size' + * and append it to the current buffer of 'ws'. + * Returns 0 in case of error or 1 in case of success. + */ static int aws_read(struct afb_ws *ws, size_t size) { ssize_t sz; @@ -192,66 +313,107 @@ static int aws_read(struct afb_ws *ws, size_t size) return 1; } +/* + * Callback when 'close' command received from 'ws' with 'code' and 'size'. + */ static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size) { struct buf b; - ws->type = none; - if (ws->itf->on_close == NULL) + ws->state = waiting; + free(aws_pick_buffer(ws).buffer); + if (ws->itf->on_close == NULL) { websock_drop(ws->ws); + afb_ws_hangup(ws); + } else if (!aws_read(ws, size)) + ws->itf->on_close(ws->closure, code, NULL, 0); else { - aws_read(ws, size); b = aws_pick_buffer(ws); ws->itf->on_close(ws->closure, code, b.buffer, b.size); } } +/* + * Drops any incoming data and send an error of 'code' + */ +static void aws_drop_error(struct afb_ws *ws, uint16_t code) +{ + ws->state = waiting; + free(aws_pick_buffer(ws).buffer); + websock_drop(ws->ws); + websock_error(ws->ws, code, NULL, 0); +} + +/* + * Reads either text or binary data of 'size' from 'ws' eventually 'last'. + */ +static void aws_continue(struct afb_ws *ws, int last, size_t size) +{ + struct buf b; + int istxt; + + if (!aws_read(ws, size)) + aws_drop_error(ws, WEBSOCKET_CODE_ABNORMAL); + else if (last) { + istxt = ws->state == reading_text; + ws->state = waiting; + b = aws_pick_buffer(ws); + b.buffer[b.size] = 0; + (istxt ? ws->itf->on_text : ws->itf->on_binary)(ws->closure, b.buffer, b.size); + } +} + +/* + * Callback when 'text' message received from 'ws' with 'size' and possibly 'last'. + */ static void aws_on_text(struct afb_ws *ws, int last, size_t size) { - if (ws->type != none) { - websock_drop(ws->ws); - websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0); - } else if (ws->itf->on_text == NULL) { - websock_drop(ws->ws); - websock_close_code(ws->ws, WEBSOCKET_CODE_CANT_ACCEPT, NULL, 0); - } else { - ws->type = text; - aws_on_continue(ws, last, size); + if (ws->state != waiting) + aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR); + else if (ws->itf->on_text == NULL) + aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT); + else { + ws->state = reading_text; + aws_continue(ws, last, size); } } +/* + * Callback when 'binary' message received from 'ws' with 'size' and possibly 'last'. + */ static void aws_on_binary(struct afb_ws *ws, int last, size_t size) { - if (ws->type != none) { - websock_drop(ws->ws); - websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0); - } else if (ws->itf->on_binary == NULL) { - websock_drop(ws->ws); - websock_close_code(ws->ws, WEBSOCKET_CODE_CANT_ACCEPT, NULL, 0); - } else { - ws->type = text; - aws_on_continue(ws, last, size); + if (ws->state != waiting) + aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR); + else if (ws->itf->on_binary == NULL) + aws_drop_error(ws, WEBSOCKET_CODE_CANT_ACCEPT); + else { + ws->state = reading_binary; + aws_continue(ws, last, size); } } +/* + * Callback when 'close' command received from 'ws' with 'code' and 'size'. + */ static void aws_on_continue(struct afb_ws *ws, int last, size_t size) { - struct buf b; - int istxt; + if (ws->state == waiting) + aws_drop_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR); + else + aws_continue(ws, last, size); +} - if (ws->type == none) { - websock_drop(ws->ws); - websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0); - } else { - if (!aws_read(ws, size)) { - aws_on_close(ws, WEBSOCKET_CODE_ABNORMAL, 0); - } else if (last) { - istxt = ws->type == text; - ws->type = none; - b = aws_pick_buffer(ws); - b.buffer[b.size] = 0; - (istxt ? ws->itf->on_text : ws->itf->on_binary)(ws->closure, b.buffer, b.size); - } - } +/* + * Callback when 'close' command is sent to 'ws' with 'code' and 'size'. + */ +static void aws_on_error(struct afb_ws *ws, uint16_t code, const void *data, size_t size) +{ + if (ws->itf->on_error != NULL) + ws->itf->on_error(ws->closure, code, data, size); + else + afb_ws_hangup(ws); } + + diff --git a/src/afb-ws.h b/src/afb-ws.h index 1faec65c..1ef61ad8 100644 --- a/src/afb-ws.h +++ b/src/afb-ws.h @@ -21,13 +21,18 @@ struct afb_ws; struct afb_ws_itf { - void (*on_close) (void *, uint16_t code, char *, size_t size); + void (*on_close) (void *, uint16_t code, char *, size_t size); /* optional, if not set hangup is called */ void (*on_text) (void *, char *, size_t size); void (*on_binary) (void *, char *, size_t size); + void (*on_error) (void *, uint16_t code, const void *, size_t size); /* optional, if not set hangup is called */ + void (*on_hangup) (void *); /* optional, it is safe too call afb_ws_destroy within the callback */ }; extern struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure); -extern void afb_ws_close(struct afb_ws *ws, uint16_t code); -extern void afb_ws_text(struct afb_ws *ws, const char *text, size_t length); -extern void afb_ws_binary(struct afb_ws *ws, const void *data, size_t length); +extern void afb_ws_destroy(struct afb_ws *ws); +extern void afb_ws_hangup(struct afb_ws *ws); +extern int afb_ws_close(struct afb_ws *ws, uint16_t code, const char *reason); +extern int afb_ws_error(struct afb_ws *ws, uint16_t code, const char *reason); +extern int afb_ws_text(struct afb_ws *ws, const char *text, size_t length); +extern int afb_ws_binary(struct afb_ws *ws, const void *data, size_t length); diff --git a/src/websock.c b/src/websock.c index 945ccd9d..afb832e8 100644 --- a/src/websock.c +++ b/src/websock.c @@ -146,15 +146,18 @@ static inline int websock_send(struct websock *ws, int last, int rsv1, int rsv2, return websock_send_internal(ws, first, buffer, size); } -int websock_close(struct websock *ws) +int websock_close_empty(struct websock *ws) { - return websock_send(ws, 1, 0, 0, 0, OPCODE_CLOSE, NULL, 0); + return websock_close(ws, WEBSOCKET_CODE_NOT_SET, NULL, 0); } -int websock_close_code(struct websock *ws, uint16_t code, const void *data, size_t length) +int websock_close(struct websock *ws, uint16_t code, const void *data, size_t length) { unsigned char buffer[125]; + if (code == WEBSOCKET_CODE_NOT_SET && length == 0) + return websock_send(ws, 1, 0, 0, 0, OPCODE_CLOSE, NULL, 0); + /* checks the length */ if (length > 123) { errno = EINVAL; @@ -203,6 +206,14 @@ int websock_binary(struct websock *ws, int last, const void *data, size_t length return websock_send(ws, last, 0, 0, 0, OPCODE_BINARY, data, length); } +int websock_error(struct websock *ws, uint16_t code, const void *data, size_t size) +{ + int rc = websock_close(ws, code, data, size); + if (ws->itf->on_error != NULL) + ws->itf->on_error(ws->closure, code, data, size); + return rc; +} + static int read_header(struct websock *ws) { if (ws->lenhead < ws->szhead) { @@ -397,11 +408,11 @@ loop: goto loop; too_long_error: - websock_close_code(ws, WEBSOCKET_CODE_MESSAGE_TOO_LARGE, NULL, 0); + websock_error(ws, WEBSOCKET_CODE_MESSAGE_TOO_LARGE, NULL, 0); return 0; protocol_error: - websock_close_code(ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0); + websock_error(ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0); return 0; } diff --git a/src/websock.h b/src/websock.h index f06bc23d..dbb3b022 100644 --- a/src/websock.h +++ b/src/websock.h @@ -41,19 +41,22 @@ struct websock_itf { ssize_t (*writev) (void *, const struct iovec *, int); ssize_t (*readv) (void *, const struct iovec *, int); - void (*on_ping) (void *, size_t size); /* if not NULL, responsible of pong */ - void (*on_pong) (void *, size_t size); + void (*on_ping) (void *, size_t size); /* optional, if not NULL, responsible of pong */ + void (*on_pong) (void *, size_t size); /* optional */ void (*on_close) (void *, uint16_t code, size_t size); void (*on_text) (void *, int last, size_t size); void (*on_binary) (void *, int last, size_t size); void (*on_continue) (void *, int last, size_t size); int (*on_extension) (void *, int last, int rsv1, int rsv2, int rsv3, int opcode, size_t size); + + void (*on_error) (void *, uint16_t code, const void *data, size_t size); /* optional */ }; struct websock; -int websock_close(struct websock *ws); -int websock_close_code(struct websock *ws, uint16_t code, const void *data, size_t length); +int websock_close_empty(struct websock *ws); +int websock_close(struct websock *ws, uint16_t code, const void *data, size_t length); +int websock_error(struct websock *ws, uint16_t code, const void *data, size_t length); int websock_ping(struct websock *ws, const void *data, size_t length); int websock_pong(struct websock *ws, const void *data, size_t length); -- 2.16.6