From: José Bollo Date: Fri, 13 May 2016 14:14:16 +0000 (+0200) Subject: example of integration with websocket in C X-Git-Tag: blowfish_2.0.1~117^2~1 X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?p=src%2Fapp-framework-binder.git;a=commitdiff_plain;h=146f95b776c7a424e672b27386fbb8392bc0ffb7 example of integration with websocket in C The file src/afb-client-demo.c provides an example of how to make a simple C client that connects to the daemon using the websocket protocol x-afb-json1. Change-Id: I31c926b2c42101a53e1ea36b4f67f095614db4a0 Signed-off-by: José Bollo --- diff --git a/CMakeLists.txt b/CMakeLists.txt index cee5a07f..12e4be95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -123,9 +123,3 @@ ADD_DEFINITIONS(-DPLUGIN_INSTALL_DIR="${plugin_install_dir}") ADD_SUBDIRECTORY(src) ADD_SUBDIRECTORY(plugins) -ADD_EXECUTABLE(afb-daemon $) -INCLUDE_DIRECTORIES(${include_dirs}) -TARGET_LINK_LIBRARIES(afb-daemon ${link_libraries}) - -INSTALL(TARGETS afb-daemon - RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_BINDIR}) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 68beea00..67e3abce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,5 @@ -ADD_LIBRARY(src OBJECT +ADD_LIBRARY(afb-lib OBJECT afb-api-dbus.c afb-api-so.c afb-apis.c @@ -16,11 +16,21 @@ ADD_LIBRARY(src OBJECT afb-ws-json1.c afb-ws.c afb-wsj1.c - main.c session.c verbose.c websock.c ) + +INCLUDE_DIRECTORIES(${include_dirs}) + +ADD_EXECUTABLE(afb-daemon $ main.c) +INCLUDE_DIRECTORIES(${include_dirs}) +TARGET_LINK_LIBRARIES(afb-daemon ${link_libraries}) + +ADD_EXECUTABLE(afb-client-demo $ afb-client-demo.c) INCLUDE_DIRECTORIES(${include_dirs}) +TARGET_LINK_LIBRARIES(afb-client-demo ${link_libraries}) +INSTALL(TARGETS afb-daemon + RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_BINDIR}) diff --git a/src/afb-client-demo.c b/src/afb-client-demo.c new file mode 100644 index 00000000..004ab3f2 --- /dev/null +++ b/src/afb-client-demo.c @@ -0,0 +1,206 @@ +/* + * Copyright (C) 2015 "IoT.bzh" + * Author "Fulup Ar Foll" + * Author José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "afb-common.h" +#include "afb-wsj1.h" +#include "afb-ws-client.h" + +static void on_hangup(void *closure, struct afb_wsj1 *wsj1); +static void on_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg); +static void on_event(void *closure, const char *event, struct afb_wsj1_msg *msg); +static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *closure); +static void emit(const char *api, const char *verb, const char *object); + +static struct afb_wsj1_itf itf = { + .on_hangup = on_hangup, + .on_call = on_call, + .on_event = on_event +}; + +static struct afb_wsj1 *wsj1; +static int exonrep; +static int callcount; +static sd_event_source *evsrc; + +static void usage(int status, char *arg0) +{ + char *name = strrchr(arg0, '/'); + name = name ? name + 1 : arg0; + fprintf(status ? stderr : stdin, "usage: %s uri [api verb data]\n", name); + exit(status); +} + +int main(int ac, char **av, char **env) +{ + if (ac != 2 && ac != 5) + usage(1, av[0]); + if (!strcmp(av[1], "-h") || !strcmp(av[1], "--help")) + usage(0, av[0]); + + wsj1 = afb_ws_client_connect_wsj1(av[1], &itf, NULL); + if (wsj1 == NULL) { + fprintf(stderr, "connection to %s failed: %m\n", av[1]); + return 1; + } + + if (ac == 2) { + fcntl(0, F_SETFL, O_NONBLOCK); + sd_event_add_io(afb_common_get_event_loop(), &evsrc, 0, EPOLLIN, io_event_callback, NULL); + } else { + exonrep = 1; + emit(av[2], av[3], av[4]); + } + for(;;) + sd_event_run(afb_common_get_event_loop(), 30000000); + return 0; +} + +static void on_hangup(void *closure, struct afb_wsj1 *wsj1) +{ + printf("ON-HANGUP\n"); + exit(0); +} + +static void on_call(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) +{ + int rc; + printf("ON-CALL %s/%s(%s)\n", api, verb, afb_wsj1_msg_object_s(msg)); + rc = afb_wsj1_reply_error_s(msg, "\"unimplemented\"", NULL); + if (rc < 0) + fprintf(stderr, "replying failed: %m\n"); +} + +static void on_event(void *closure, const char *event, struct afb_wsj1_msg *msg) +{ + printf("ON-EVENT %s(%s)\n", event, afb_wsj1_msg_object_s(msg)); +} + +static void event(const char *event, const char *object) +{ + int rc; + + rc = afb_wsj1_send_event_s(wsj1, event, object); + if (rc < 0) + fprintf(stderr, "sending !%s(%s) failed: %m\n", event, object); +} + +static void on_reply(void *closure, struct afb_wsj1_msg *msg) +{ + printf("ON-REPLY %s: %s\n", (char*)closure, afb_wsj1_msg_object_s(msg)); + free(closure); + callcount--; + if (exonrep && !callcount) + //afb_wsj1_hangup(afb_wsj1_msg_wsj1(msg)); + exit(0); +} + +static void call(const char *api, const char *verb, const char *object) +{ + static int num = 0; + char *key; + int rc; + + rc = asprintf(&key, "%d:%s/%s", ++num, api, verb); + callcount++; + rc = afb_wsj1_call_s(wsj1, api, verb, object, on_reply, key); + if (rc < 0) { + fprintf(stderr, "calling %s/%s(%s) failed: %m\n", api, verb, object); + callcount--; + } +} + +static void emit(const char *api, const char *verb, const char *object) +{ + if (api[0] == '!' && api[1] == 0) + event(verb, object); + else + call(api, verb, object); +} + +static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *closure) +{ + static size_t count = 0; + static char line[16384]; + static char sep[] = " \t"; + static char sepnl[] = " \t\n"; + + ssize_t rc; + size_t pos; + + /* read the buffer */ + do { rc = read(0, line + count, sizeof line - count); } while (rc < 0 && errno == EINTR); + if (rc < 0) { + fprintf(stderr, "read error: %m\n"); + exit(1); + } + if (rc == 0) { + if (!callcount) + exit(0); + exonrep = 1; + sd_event_source_unref(evsrc); + } + count += (size_t)rc; + + /* normalise the buffer content */ + /* TODO: handle backspace \x7f */ + + /* process the lines */ + pos = 0; + for(;;) { + size_t i, api[2], verb[2], rest[2]; + i = pos; + while(i < count && strchr(sep, line[i])) i++; + api[0] = i; while(i < count && !strchr(sepnl, line[i])) i++; api[1] = i; + while(i < count && strchr(sep, line[i])) i++; + verb[0] = i; while(i < count && !strchr(sepnl, line[i])) i++; verb[1] = i; + while(i < count && strchr(sep, line[i])) i++; + rest[0] = i; while(i < count && line[i] != '\n') i++; rest[1] = i; + if (i == count) break; + line[i++] = 0; + if (api[0] == api[1] || verb[0] == verb[1] || rest[0] == rest[1]) + fprintf(stderr, "bad line: %s\n", line+pos); + else { + line[api[1]] = line[verb[1]] = 0; + emit(line + api[0], line + verb[0], line + rest[0]); + } + pos = i; + } + count -= pos; + if (count == sizeof line) { + fprintf(stderr, "overflow\n"); + exit(1); + } + if (count) + memmove(line, line + pos, count); + return 1; +} + diff --git a/src/afb-context.c b/src/afb-context.c index 2f391dec..62f1f06d 100644 --- a/src/afb-context.c +++ b/src/afb-context.c @@ -23,8 +23,7 @@ #include "session.h" #include "afb-context.h" - -void afb_context_init(struct afb_context *context, struct AFB_clientCtx *session, const char *token) +static void init_context(struct afb_context *context, struct AFB_clientCtx *session, const char *token) { assert(session != NULL); @@ -42,6 +41,11 @@ void afb_context_init(struct afb_context *context, struct AFB_clientCtx *session } } +void afb_context_init(struct afb_context *context, struct AFB_clientCtx *session, const char *token) +{ + init_context(context, ctxClientAddRef(session), token); +} + int afb_context_connect(struct afb_context *context, const char *uuid, const char *token) { int created; @@ -50,7 +54,7 @@ int afb_context_connect(struct afb_context *context, const char *uuid, const cha session = ctxClientGetSession (uuid, &created); if (session == NULL) return -1; - afb_context_init(context, session, token); + init_context(context, session, token); if (created) context->created = 1; return 0; diff --git a/src/afb-ws-client.c b/src/afb-ws-client.c index 6ed3f50f..ceb00cbe 100644 --- a/src/afb-ws-client.c +++ b/src/afb-ws-client.c @@ -82,8 +82,12 @@ static char *strjoin(int count, const char **strings, const char *separ) for(count = 0 ; strings[count] != NULL ; count++); /* compute the length of the result */ - length = 0; - if (count != 0) { + if (count == 0) + length = 0; + else { + length = (unsigned)(count - 1) * strlen(separ); + for (idx = 0 ; idx < count ; idx ++) + length += strlen(strings[idx]); } /* allocates the result */ @@ -104,10 +108,11 @@ static char *strjoin(int count, const char **strings, const char *separ) } /* creates the http message for the request */ -static int make_request(char **request, const char *path, const char *key, const char *protocols) +static int make_request(char **request, const char *path, const char *host, const char *key, const char *protocols) { int rc = asprintf(request, - "GET %s HTTP1.1\r\n" + "GET %s HTTP/1.1\r\n" + "Host: %s\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Version: 13\r\n" @@ -116,6 +121,7 @@ static int make_request(char **request, const char *path, const char *key, const "Content-Length: 0\r\n" "\r\n" , path + , host , key , protocols ); @@ -128,7 +134,7 @@ static int make_request(char **request, const char *path, const char *key, const } /* create the request and send it to fd, returns the expected accept string */ -static const char *send_request(int fd, const char **protocols, const char *path) +static const char *send_request(int fd, const char **protocols, const char *path, const char *host) { const char *key, *ack; char *protolist, *request; @@ -141,7 +147,7 @@ static const char *send_request(int fd, const char **protocols, const char *path /* create the request */ getkeypair(&key, &ack); - length = make_request(&request, path, key, protolist); + length = make_request(&request, path, host, key, protolist); free(protolist); if (length < 0) return NULL; @@ -196,12 +202,12 @@ static int receive_response(int fd, const char **protocols, const char *ack) if (len != 8 || 0 != strncmp(line, "HTTP/1.1", 8)) goto error; it = line + len; - len = strspn(line, " "); + len = strspn(it, " "); if (len == 0) goto error; it += len; - len = strcspn(line, " "); - if (len != 3 || 0 != strncmp(line, "101", 3)) + len = strcspn(it, " "); + if (len != 3 || 0 != strncmp(it, "101", 3)) goto error; /* reads the rest of the response until empty line */ @@ -254,9 +260,9 @@ error: return result; } -static int negociate(int fd, const char **protocols, const char *path) +static int negociate(int fd, const char **protocols, const char *path, const char *host) { - const char *ack = send_request(fd, protocols, path); + const char *ack = send_request(fd, protocols, path, host); return ack == NULL ? -1 : receive_response(fd, protocols, ack); } @@ -296,8 +302,7 @@ static int parse_uri(const char *uri, char **host, char **service, const char ** /* make the result */ *host = strndup(h, hlen); if (*host != NULL) { - return -1; - *service = plen ? strndup(h, hlen) : strdup("http"); + *service = plen ? strndup(p, plen) : strdup("http"); if (*service != NULL) { *path = uri; return 0; @@ -321,7 +326,7 @@ static const char *proto_json1[2] = { "x-afb-ws-json1", NULL }; struct afb_wsj1 *afb_ws_client_connect_wsj1(const char *uri, struct afb_wsj1_itf *itf, void *closure) { int rc, fd; - char *host, *service; + char *host, *service, xhost[32]; const char *path; struct addrinfo hint, *rai, *iai; struct afb_wsj1 *result; @@ -347,11 +352,17 @@ struct afb_wsj1 *afb_ws_client_connect_wsj1(const char *uri, struct afb_wsj1_itf result = NULL; iai = rai; while (iai != NULL) { + struct sockaddr_in *a = (struct sockaddr_in*)(iai->ai_addr); + unsigned char *ipv4 = (unsigned char*)&(a->sin_addr.s_addr); + unsigned char *port = (unsigned char*)&(a->sin_port); + sprintf(xhost, "%d.%d.%d.%d:%d", + (int)ipv4[0], (int)ipv4[1], (int)ipv4[2], (int)ipv4[3], + (((int)port[0]) << 8)|(int)port[1]); fd = socket(iai->ai_family, iai->ai_socktype, iai->ai_protocol); if (fd >= 0) { rc = connect(fd, iai->ai_addr, iai->ai_addrlen); if (rc == 0) { - rc = negociate(fd, proto_json1, path); + rc = negociate(fd, proto_json1, path, xhost); if (rc == 0) { result = afb_wsj1_create(fd, itf, closure); if (result != NULL) diff --git a/src/afb-ws-client.h b/src/afb-ws-client.h new file mode 100644 index 00000000..2117bb8c --- /dev/null +++ b/src/afb-ws-client.h @@ -0,0 +1,23 @@ +/* + * Copyright 2016 IoT.bzh + * Author: José Bollo + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +struct afb_wsj1; +struct afb_wsj1_itf; + +extern struct afb_wsj1 *afb_ws_client_connect_wsj1(const char *uri, struct afb_wsj1_itf *itf, void *closure); diff --git a/src/afb-ws.c b/src/afb-ws.c index 5e8732db..22d96914 100644 --- a/src/afb-ws.c +++ b/src/afb-ws.c @@ -272,6 +272,7 @@ int afb_ws_texts(struct afb_ws *ws, ...) } ios[count].iov_base = (void*)s; ios[count].iov_len = strlen(s); + count++; s = va_arg(args, const char *); } va_end(args); diff --git a/src/afb-wsj1.c b/src/afb-wsj1.c index 0a7dfd80..7d4730e4 100644 --- a/src/afb-wsj1.c +++ b/src/afb-wsj1.c @@ -34,8 +34,8 @@ #define RETERR 4 #define EVENT 5 -static void wsj1_on_hangup(struct afb_wsj1 *ws); -static void wsj1_on_text(struct afb_wsj1 *ws, char *text, size_t size); +static void wsj1_on_hangup(struct afb_wsj1 *wsj1); +static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size); static struct afb_ws_itf wsj1_itf = { .on_hangup = (void*)wsj1_on_hangup, @@ -127,10 +127,10 @@ void afb_wsj1_unref(struct afb_wsj1 *wsj1) } } -static void wsj1_on_hangup(struct afb_wsj1 *ws) +static void wsj1_on_hangup(struct afb_wsj1 *wsj1) { - if (ws->itf->on_hangup != NULL) - ws->itf->on_hangup(ws->closure); + if (wsj1->itf->on_hangup != NULL) + wsj1->itf->on_hangup(wsj1->closure, wsj1); } @@ -302,7 +302,8 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) afb_wsj1_addref(wsj1); msg->wsj1 = wsj1; msg->next = wsj1->messages; - msg->next->previous = msg; + if (msg->next != NULL) + msg->next->previous = msg; wsj1->messages = msg; /* incoke the handler */ @@ -418,90 +419,11 @@ const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg) return msg->token; } - - - - - - - - - -#if 0 - - - - - -static void wsj1_emit(struct afb_wsj1 *wsj1, int code, const char *id, size_t idlen, struct json_object *data, const char *token) +struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg) { - json_object *msg; - const char *txt; - - /* pack the message */ - msg = json_object_new_array(); - json_object_array_add(msg, json_object_new_int(code)); - json_object_array_add(msg, json_object_new_string_len(id, (int)idlen)); - json_object_array_add(msg, data); - if (token) - json_object_array_add(msg, json_object_new_string(token)); - - /* emits the reply */ - txt = json_object_to_json_string(msg); - afb_ws_text(wsj1->ws, txt, strlen(txt)); - json_object_put(msg); + return msg->wsj1; } -static void wsj1_msg_reply(struct afb_wsj1_msg *msg, int retcode, const char *status, const char *info, json_object *resp) -{ - const char *token = afb_context_sent_token(&msg->context); - wsj1_emit(msg->wsj1, retcode, msg->id, msg->idlen, afb_msg_json_reply(status, info, resp, token, NULL), token); -} - -static void wsj1_msg_fail(struct afb_wsj1_msg *msg, const char *status, const char *info) -{ - wsj1_msg_reply(msg, RETERR, status, info, NULL); -} - -static void wsj1_msg_success(struct afb_wsj1_msg *msg, json_object *obj, const char *info) -{ - wsj1_msg_reply(msg, RETOK, "success", info, obj); -} - -static const char *wsj1_msg_raw(struct afb_wsj1_msg *msg, size_t *size) -{ - *size = msg->objlen; - return msg->obj; -} - -static void wsj1_msg_send(struct afb_wsj1_msg *msg, const char *buffer, size_t size) -{ - afb_ws_text(msg->wsj1->ws, buffer, size); -} - -static void wsj1_send_event(struct afb_wsj1 *wsj1, const char *event, struct json_object *object) -{ - wsj1_emit(wsj1, EVENT, event, strlen(event), afb_msg_json_event(event, object), NULL); -} - - - - - - - - - - - - - - - - -#endif - - static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1) { char code[2] = { (char)('0' + i1), 0 }; diff --git a/src/afb-wsj1.h b/src/afb-wsj1.h index d96367f9..ab390cf0 100644 --- a/src/afb-wsj1.h +++ b/src/afb-wsj1.h @@ -23,7 +23,7 @@ struct afb_wsj1_msg; struct json_object; struct afb_wsj1_itf { - void (*on_hangup)(void *closure); + void (*on_hangup)(void *closure, struct afb_wsj1 *wsj1); void (*on_call)(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg); void (*on_event)(void *closure, const char *event, struct afb_wsj1_msg *msg); }; @@ -58,6 +58,7 @@ extern const char *afb_wsj1_msg_api(struct afb_wsj1_msg *msg); extern const char *afb_wsj1_msg_verb(struct afb_wsj1_msg *msg); extern const char *afb_wsj1_msg_event(struct afb_wsj1_msg *msg); extern const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg); +extern struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg); extern const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg); extern struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg); diff --git a/src/session.c b/src/session.c index 22e50cfc..326b16af 100644 --- a/src/session.c +++ b/src/session.c @@ -197,11 +197,10 @@ static void ctxStoreCleanUp (time_t now) // Loop on Sessions Table and remove anything that is older than timeout for (idx=0; idx < sessions.max; idx++) { - ctx = ctxClientAddRef(sessions.store[idx]); + ctx = sessions.store[idx]; if (ctx != NULL && ctxStoreTooOld(ctx, now)) { ctxClientClose (ctx); } - ctxClientUnref(ctx); } } @@ -291,6 +290,10 @@ void ctxClientClose (struct AFB_clientCtx *clientCtx) ctxUuidFreeCB (clientCtx); while(clientCtx->listeners != NULL) ctxClientEventListenerRemove(clientCtx, clientCtx->listeners->listener); + if (clientCtx->refcount == 0) { + ctxStoreDel (clientCtx); + free(clientCtx); + } } }