From 6ea1d50ab6571551e1d0379940349911956c97ee Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Fri, 8 Apr 2016 10:29:54 +0200 Subject: [PATCH] new websocket handling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: I71fc5ff0412af6badce77485c98016916c56d235 Signed-off-by: José Bollo --- src/afb-hswitch.c | 12 +- src/afb-websock.c | 477 +++++++++--------------------------------------------- src/afb-websock.h | 5 +- src/afb-ws.c | 6 +- src/websock.c | 4 +- test/websock.js | 2 +- 6 files changed, 84 insertions(+), 422 deletions(-) diff --git a/src/afb-hswitch.c b/src/afb-hswitch.c index 3fcd77de..d531155d 100644 --- a/src/afb-hswitch.c +++ b/src/afb-hswitch.c @@ -74,18 +74,10 @@ int afb_hswitch_one_page_api_redirect(struct afb_hreq *hreq, void *data) int afb_hswitch_websocket_switch(struct afb_hreq *hreq, void *data) { - int later; - - afb_hreq_context(hreq); - if (hreq->lentail != 0 || !afb_websock_check(hreq, &later)) + if (hreq->lentail != 0) return 0; - if (!later) { - struct afb_websock *ws = afb_websock_create(hreq); - if (ws != NULL) - hreq->upgrade = 1; - } - return 1; + return afb_websock_check_upgrade(hreq /* TODO: protocols here */); } diff --git a/src/afb-websock.c b/src/afb-websock.c index d92c7b00..0fd5b66f 100644 --- a/src/afb-websock.c +++ b/src/afb-websock.c @@ -16,27 +16,20 @@ */ #define _GNU_SOURCE -#include + +#include #include #include -#include #include -#include - #include - -#include "websock.h" +#include #include "afb-ws-json.h" -#include "afb-req-itf.h" #include "afb-method.h" #include "afb-hreq.h" #include "afb-websock.h" -#include "afb-apis.h" -#include "session.h" -#include "utils-upoll.h" /**************** WebSocket connection upgrade ****************************/ @@ -80,29 +73,61 @@ static void make_accept_value(const char *key, char result[29]) result[28] = 0; } +static const char vseparators[] = " \t,"; + static int headerhas(const char *header, const char *needle) { - static const char sep[] = " \t,"; size_t len, n; n = strlen(needle); for(;;) { - header += strspn(header, sep); + header += strspn(header, vseparators); if (!*header) return 0; - len = strcspn(header, sep); + len = strcspn(header, vseparators); if (n == len && 0 == strncasecmp(needle, header, n)) return 1; header += len; } } -int afb_websock_check(struct afb_hreq *hreq, int *later) +struct protodef +{ + const char *name; + void *(*create)(int fd, struct AFB_clientCtx *context, void (*cleanup)(void*), void *closure); +}; + +static const struct protodef protodefs[] = { + { "x-afb-ws-json1", (void*)afb_ws_json_create }, + { NULL, NULL } +}; + +static const struct protodef *search_proto(const char *protocols) +{ + int i; + size_t len; + + for(;;) { + protocols += strspn(protocols, vseparators); + if (!*protocols) + return NULL; + len = strcspn(protocols, vseparators); + for (i = 0 ; protodefs[i].name != NULL ; i++) + if (!strncasecmp(protodefs[i].name, protocols, len) + && !protodefs[i].name[len]) + return &protodefs[i]; + protocols += len; + } +} + +int afb_websock_check_upgrade(struct afb_hreq *hreq) { const char *connection, *upgrade, *key, *version, *protocols; char acceptval[29]; int vernum; struct MHD_Response *response; + const struct protodef *proto; + void *ws; /* is an upgrade to websocket ? */ upgrade = afb_hreq_get_header(hreq, MHD_HTTP_HEADER_UPGRADE); @@ -111,11 +136,13 @@ int afb_websock_check(struct afb_hreq *hreq, int *later) /* is a connection for upgrade ? */ connection = afb_hreq_get_header(hreq, MHD_HTTP_HEADER_CONNECTION); - if (connection == NULL || !headerhas (connection, MHD_HTTP_HEADER_UPGRADE)) + if (connection == NULL + || !headerhas (connection, MHD_HTTP_HEADER_UPGRADE)) return 0; /* is a get ? */ - if(hreq->method != afb_method_get || strcasecmp(hreq->version, MHD_HTTP_VERSION_1_1)) + if (hreq->method != afb_method_get + || strcasecmp(hreq->version, MHD_HTTP_VERSION_1_1)) return 0; /* has a key and a version ? */ @@ -127,405 +154,51 @@ int afb_websock_check(struct afb_hreq *hreq, int *later) /* is a supported version ? */ vernum = atoi(version); if (vernum != 13) { - response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT); - MHD_add_response_header (response, sec_websocket_version_s, "13"); - MHD_queue_response (hreq->connection, MHD_HTTP_BAD_REQUEST, response); + response = MHD_create_response_from_buffer(0, NULL, + MHD_RESPMEM_PERSISTENT); + MHD_add_response_header (response, sec_websocket_version_s, + "13"); + MHD_queue_response (hreq->connection, MHD_HTTP_BAD_REQUEST, + response); MHD_destroy_response (response); - *later = 1; return 1; } /* is the protocol supported ? */ protocols = afb_hreq_get_header(hreq, sec_websocket_protocol_s); + proto = protocols == NULL ? NULL : search_proto(protocols); + if (proto == NULL) { + afb_hreq_reply_error(hreq, MHD_HTTP_PRECONDITION_FAILED); + return 1; + } - /* send the accept connection */ - make_accept_value(key, acceptval); - response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT); - MHD_add_response_header (response, sec_websocket_accept_s, acceptval); - MHD_add_response_header (response, MHD_HTTP_HEADER_CONNECTION, MHD_HTTP_HEADER_UPGRADE); - MHD_add_response_header (response, MHD_HTTP_HEADER_UPGRADE, websocket_s); - MHD_queue_response (hreq->connection, MHD_HTTP_SWITCHING_PROTOCOLS, response); - MHD_destroy_response (response); - - *later = 0; - return 1; -} - -/**************** WebSocket handling ****************************/ - -static ssize_t aws_writev(struct afb_websock *ws, const struct iovec *iov, int iovcnt); -static ssize_t aws_readv(struct afb_websock *ws, const struct iovec *iov, int iovcnt); -static void aws_disconnect(struct afb_websock *ws); -static void aws_on_close(struct afb_websock *ws, uint16_t code, size_t size); -static void aws_on_content(struct afb_websock *ws, int last, size_t size); -static void aws_on_readable(struct afb_websock *ws); - -static struct websock_itf aws_itf = { - .writev = (void*)aws_writev, - .readv = (void*)aws_readv, - .disconnect = (void*)aws_disconnect, - - .on_ping = NULL, - .on_pong = NULL, - .on_close = (void*)aws_on_close, - .on_text = (void*)aws_on_content, - .on_binary = (void*)aws_on_content, - .on_continue = (void*)aws_on_content, - .on_extension = NULL -}; - -struct afb_wsreq -{ - struct afb_websock *aws; - struct afb_wsreq *next; - struct json_object *id; - struct json_object *name; - struct json_object *token; - struct json_object *request; -}; - -struct afb_websock -{ - int fd; - struct afb_wsreq *requests; - struct MHD_Connection *connection; - struct AFB_clientCtx *context; - struct websock *ws; - struct upoll *up; - struct json_tokener *tokener; -}; - -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name); -static void wsreq_iterate(struct afb_wsreq *wsreq, int (*iterator)(void *closure, struct afb_arg arg), void *closure); -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info); -static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info); -static int wsreq_session_create(struct afb_wsreq *wsreq); -static int wsreq_session_check(struct afb_wsreq *wsreq, int refresh); -static void wsreq_session_close(struct afb_wsreq *wsreq); - -static const struct afb_req_itf wsreq_itf = { - .get = (void*)wsreq_get, - .iterate = (void*)wsreq_iterate, - .fail = (void*)wsreq_fail, - .success = (void*)wsreq_success, - .session_create = (void*)wsreq_session_create, - .session_check = (void*)wsreq_session_check, - .session_close = (void*)wsreq_session_close -}; - -struct afb_websock *afb_websock_create(struct afb_hreq *hreq) -{ - return (void*)afb_ws_json_create( - dup(MHD_get_connection_info(hreq->connection,MHD_CONNECTION_INFO_CONNECTION_FD)->connect_fd), + /* create the web socket */ + /* TODO fullfil ondisconnection and records!!! */ + ws = proto->create(dup(MHD_get_connection_info(hreq->connection,MHD_CONNECTION_INFO_CONNECTION_FD)->connect_fd), afb_hreq_context(hreq), (void*)MHD_resume_connection, hreq->connection); -} - -struct afb_websock *_afb_websock_create(struct afb_hreq *hreq) -{ - int fd; - struct afb_websock *result; - - fd = MHD_get_connection_info(hreq->connection, - MHD_CONNECTION_INFO_CONNECTION_FD)->connect_fd; - fd = dup(fd); - if (fd < 0) - return NULL; - - result = malloc(sizeof * result); - if (result == NULL) - goto error; - - result->fd = fd; - result->requests = NULL; - result->connection = hreq->connection; - result->context = ctxClientGet(afb_hreq_context(hreq)); - if (result->context == NULL) - goto error2; - - result->tokener = json_tokener_new(); - if (result->tokener == NULL) - goto error3; - - result->ws = websock_create_v13(&aws_itf, result); - if (result->ws == NULL) - goto error4; - - result->up = upoll_open(result->fd, result); - if (result->up == NULL) - goto error5; - - upoll_on_readable(result->up, (void*)aws_on_readable); - upoll_on_hangup(result->up, (void*)aws_disconnect); - return result; -error5: - websock_destroy(result->ws); -error4: - json_tokener_free(result->tokener); -error3: - ctxClientPut(result->context); -error2: - free(result); -error: - close(fd); - return NULL; -} - -static ssize_t aws_writev(struct afb_websock *ws, const struct iovec *iov, int iovcnt) -{ - ssize_t rc; - do { - rc = writev(ws->fd, iov, iovcnt); - } while(rc == -1 && errno == EINTR); - return rc; -} - -static ssize_t aws_readv(struct afb_websock *ws, const struct iovec *iov, int iovcnt) -{ - ssize_t rc; - do { - rc = readv(ws->fd, iov, iovcnt); - } while(rc == -1 && errno == EINTR); - return rc; -} - -static void aws_disconnect(struct afb_websock *ws) -{ - upoll_close(ws->up); - websock_destroy(ws->ws); - close(ws->fd); - MHD_resume_connection (ws->connection); - ctxClientPut(ws->context); - json_tokener_free(ws->tokener); - free(ws); -} - -static void aws_on_close(struct afb_websock *ws, uint16_t code, size_t size) -{ - /* do nothing */ -} - -static void aws_on_readable(struct afb_websock *ws) -{ - websock_dispatch(ws->ws); -} - -static int aws_handle_json(struct afb_websock *aws, struct json_object *obj) -{ - struct afb_req r; - int count, num; - struct json_object *type, *id, *name, *req, *token; - struct afb_wsreq *wsreq; - const char *api, *verb; - size_t lenapi, lenverb; - - /* protocol inspired by http://www.gir.fr/ocppjs/ocpp_srpc_spec.shtml */ - - /* the object must be an array of 4 or 5 elements */ - if (!json_object_is_type(obj, json_type_array)) - goto error; - count = json_object_array_length(obj); - if (count < 4 || count > 5) - goto error; - - /* get the 5 elements: type id name request token */ - type = json_object_array_get_idx(obj, 0); - id = json_object_array_get_idx(obj, 1); - name = json_object_array_get_idx(obj, 2); - req = json_object_array_get_idx(obj, 3); - token = json_object_array_get_idx(obj, 4); - - /* check the types: int string string object string */ - if (!json_object_is_type(type, json_type_int)) - goto error; - if (!json_object_is_type(id, json_type_string)) - goto error; - if (!json_object_is_type(name, json_type_string)) - goto error; - if (!json_object_is_type(req, json_type_object)) - goto error; - if (token != NULL && !json_object_is_type(token, json_type_string)) - goto error; - - /* the type is only 2 */ - num = json_object_get_int(type); - if (num != 2) - goto error; - - /* checks the api/verb structure of name */ - api = json_object_get_string(name); - for (lenapi = 0 ; api[lenapi] && api[lenapi] != '/' ; lenapi++); - if (!lenapi || !api[lenapi]) - goto error; - verb = &api[lenapi+1]; - for (lenverb = 0 ; verb[lenverb] && verb[lenverb] != '/' ; lenverb++); - if (!lenverb || verb[lenverb]) - goto error; - - /* allocates the request data */ - wsreq = malloc(sizeof *wsreq); - if (wsreq == NULL) - goto error; - - /* fill and record the request */ - wsreq->aws = aws; - wsreq->id = json_object_get(id); - wsreq->name = json_object_get(name); - wsreq->token = json_object_get(token); - wsreq->request = json_object_get(req); - wsreq->next = aws->requests; - aws->requests = wsreq; - json_object_put(obj); - - r.data = wsreq; - r.itf = &wsreq_itf; - afb_apis_call(r, aws->context, api, lenapi, verb, lenverb); - return 1; - -error: - json_object_put(obj); - return 0; -} - -static void aws_on_content(struct afb_websock *ws, int last, size_t size) -{ - ssize_t rrc; - char buffer[8000]; - struct json_object *obj; - - json_tokener_reset(ws->tokener); - while(size) { - rrc = websock_read(ws->ws, buffer, - size > sizeof buffer ? sizeof buffer : size); - if (rrc < 0) { - websock_close(ws->ws); - return; - } - size -= (size_t)rrc; - obj = json_tokener_parse_ex(ws->tokener, buffer, (int)rrc); - if (obj != NULL) { - if (!aws_handle_json(ws, obj)) { - websock_close(ws->ws); - return; - } - } else if (json_tokener_get_error(ws->tokener) != json_tokener_continue) { - websock_close(ws->ws); - return; - } - } -} - -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name) -{ - struct afb_arg arg; - struct json_object *value; - - if (json_object_object_get_ex(wsreq->request, name, &value)) { - arg.name = name; - arg.value = json_object_get_string(value); - arg.size = strlen(arg.value); - } else { - arg.name = NULL; - arg.value = NULL; - arg.size = 0; - } - arg.path = NULL; - return arg; -} - -static void wsreq_iterate(struct afb_wsreq *wsreq, int (*iterator)(void *closure, struct afb_arg arg), void *closure) -{ - struct afb_arg arg; - struct json_object_iterator it = json_object_iter_begin(wsreq->request); - struct json_object_iterator end = json_object_iter_end(wsreq->request); - - arg.size = 0; - arg.path = NULL; - while(!json_object_iter_equal(&it, &end)) { - arg.name = json_object_iter_peek_name(&it); - arg.value = json_object_get_string(json_object_iter_peek_value(&it)); - if (!iterator(closure, arg)) - break; - json_object_iter_next(&it); + if (ws == NULL) { + afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR); + return 1; } -} -static int wsreq_session_create(struct afb_wsreq *wsreq) -{ - struct AFB_clientCtx *context = wsreq->aws->context; - if (context->created) - return 0; - return wsreq_session_check(wsreq, 1); -} - -static int wsreq_session_check(struct afb_wsreq *wsreq, int refresh) -{ - const char *token; - struct AFB_clientCtx *context = wsreq->aws->context; - - if (wsreq->token == NULL) - return 0; - - token = json_object_get_string(wsreq->token); - if (token == NULL) - return 0; - - if (!ctxTokenCheck (context, token)) - return 0; - - if (refresh) { - ctxTokenNew (context); - } + /* send the accept connection */ + make_accept_value(key, acceptval); + response = MHD_create_response_from_buffer(0, NULL, + MHD_RESPMEM_PERSISTENT); + MHD_add_response_header (response, sec_websocket_accept_s, acceptval); + MHD_add_response_header (response, sec_websocket_protocol_s, + proto->name); + MHD_add_response_header (response, MHD_HTTP_HEADER_CONNECTION, + MHD_HTTP_HEADER_UPGRADE); + MHD_add_response_header (response, MHD_HTTP_HEADER_UPGRADE, + websocket_s); + MHD_queue_response (hreq->connection, MHD_HTTP_SWITCHING_PROTOCOLS, + response); + MHD_destroy_response (response); + hreq->upgrade = 1; return 1; } -static void wsreq_session_close(struct afb_wsreq *wsreq) -{ - struct AFB_clientCtx *context = wsreq->aws->context; - ctxClientClose(context); -} - - -static void wsreq_reply(struct afb_wsreq *wsreq, int retcode, const char *status, const char *info, json_object *resp) -{ - json_object *root, *request, *reply; - const char *message; - - /* builds the answering structure */ - root = json_object_new_object(); - json_object_object_add(root, "jtype", json_object_new_string("afb-reply")); - request = json_object_new_object(); - json_object_object_add(root, "request", request); - json_object_object_add(request, "status", json_object_new_string(status)); - if (info) - json_object_object_add(request, "info", json_object_new_string(info)); - if (resp) - json_object_object_add(root, "response", resp); - - /* make the reply */ - reply = json_object_new_array(); - json_object_array_add(reply, json_object_new_int(retcode)); - json_object_array_add(reply, wsreq->id); - json_object_array_add(reply, root); - json_object_array_add(reply, json_object_new_string(wsreq->aws->context->token)); - - /* emits the reply */ - message = json_object_to_json_string(reply); - websock_text(wsreq->aws->ws, message, strlen(message)); - json_object_put(reply); - - /* TODO eliminates the wsreq */ -} - -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info) -{ - wsreq_reply(wsreq, 4, status, info, NULL); -} - -static void wsreq_success(struct afb_wsreq *wsreq, json_object *obj, const char *info) -{ - wsreq_reply(wsreq, 3, "success", info, obj); -} - diff --git a/src/afb-websock.h b/src/afb-websock.h index 4f7ea912..4a0a3e55 100644 --- a/src/afb-websock.h +++ b/src/afb-websock.h @@ -16,9 +16,6 @@ */ struct afb_hreq; -struct afb_websock; - -extern int afb_websock_check(struct afb_hreq *hreq, int *later); -extern struct afb_websock *afb_websock_create(struct afb_hreq *hreq); +extern int afb_websock_check_upgrade(struct afb_hreq *hreq); diff --git a/src/afb-ws.c b/src/afb-ws.c index 3dae3390..49a314a6 100644 --- a/src/afb-ws.c +++ b/src/afb-ws.c @@ -116,8 +116,6 @@ void afb_ws_disconnect(struct afb_ws *ws) struct websock *wsi = ws->ws; ws->up = NULL; ws->ws = NULL; - upoll_on_hangup(up, NULL); - upoll_on_readable(up, NULL); upoll_close(up); websock_destroy(wsi); } @@ -162,10 +160,12 @@ static void aws_on_readable(struct afb_ws *ws) static void aws_on_hangup(struct afb_ws *ws) { + afb_ws_disconnect(ws); } static void aws_disconnect(struct afb_ws *ws) { + afb_ws_disconnect(ws); } static inline struct buf aws_pick_buffer(struct afb_ws *ws) @@ -204,7 +204,7 @@ static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size) else { aws_read(ws, size); b = aws_pick_buffer(ws); - ws->itf->on_close(ws, code, b.buffer, b.size); + ws->itf->on_close(ws->closure, code, b.buffer, b.size); } } diff --git a/src/websock.c b/src/websock.c index 41e063f8..c578eba4 100644 --- a/src/websock.c +++ b/src/websock.c @@ -199,10 +199,10 @@ static int check_control_header(struct websock *ws) return 0; if (FRAME_GET_RSV3(ws->header[0]) != 0) return 0; - if (FRAME_GET_MASK(ws->header[1])) - return 0; if (FRAME_GET_OPCODE(ws->header[0]) == OPCODE_CLOSE) return FRAME_GET_PAYLOAD_LEN(ws->header[1]) != 1; + if (FRAME_GET_MASK(ws->header[1])) + return 0; return FRAME_GET_PAYLOAD_LEN(ws->header[1]) == 0; } diff --git a/test/websock.js b/test/websock.js index 1ba136b4..84465588 100644 --- a/test/websock.js +++ b/test/websock.js @@ -29,7 +29,7 @@ AfbWsItf = (function(){ function AfbWsItf(base, onopen, onabort, ctx) { var wl = window.location; var u = "ws://"+wl.host+"/"+base; - this.ws = new (WebSocket || MozWebSocket)(u, [ "afb1", "afb2" ]); + this.ws = new (WebSocket || MozWebSocket)(u, [ "x-afb-ws-json1" ]); this.pendings = {}; this.counter = 0; this.ctx = ctx || new AfbCtxItf(); -- 2.16.6