2 * Copyright 2016 IoT.bzh
3 * Author: José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
30 #include "utils-upoll.h"
32 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
33 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt);
34 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size);
35 static void aws_on_text(struct afb_ws *ws, int last, size_t size);
36 static void aws_on_binary(struct afb_ws *ws, int last, size_t size);
37 static void aws_on_continue(struct afb_ws *ws, int last, size_t size);
38 static void aws_on_readable(struct afb_ws *ws);
39 static void aws_on_hangup(struct afb_ws *ws);
41 static struct websock_itf aws_itf = {
42 .writev = (void*)aws_writev,
43 .readv = (void*)aws_readv,
47 .on_close = (void*)aws_on_close,
48 .on_text = (void*)aws_on_text,
49 .on_binary = (void*)aws_on_binary,
50 .on_continue = (void*)aws_on_continue,
63 enum { none, text, binary } type;
64 const struct afb_ws_itf *itf;
71 struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure)
73 struct afb_ws *result;
77 result = malloc(sizeof * result);
84 result->closure = closure;
86 result->ws = websock_create_v13(&aws_itf, result);
87 if (result->ws == NULL)
90 result->up = upoll_open(result->fd, result);
91 if (result->up == NULL)
94 result->buffer.buffer = NULL;
95 result->buffer.size = 0;
97 upoll_on_readable(result->up, (void*)aws_on_readable);
98 upoll_on_hangup(result->up, (void*)aws_on_hangup);
103 websock_destroy(result->ws);
111 void afb_ws_disconnect(struct afb_ws *ws)
113 struct upoll *up = ws->up;
114 struct websock *wsi = ws->ws;
118 websock_destroy(wsi);
121 void afb_ws_close(struct afb_ws *ws, uint16_t code)
123 websock_close_code(ws->ws, code, NULL, 0);
126 void afb_ws_text(struct afb_ws *ws, const char *text, size_t length)
128 websock_text(ws->ws, 1, text, length);
131 void afb_ws_binary(struct afb_ws *ws, const void *data, size_t length)
133 websock_binary(ws->ws, 1, data, length);
136 static ssize_t aws_writev(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
140 rc = writev(ws->fd, iov, iovcnt);
141 } while(rc == -1 && errno == EINTR);
145 static ssize_t aws_readv(struct afb_ws *ws, const struct iovec *iov, int iovcnt)
149 rc = readv(ws->fd, iov, iovcnt);
150 } while(rc == -1 && errno == EINTR);
158 static void aws_on_readable(struct afb_ws *ws)
160 if (websock_dispatch(ws->ws) < 0 && errno == EPIPE)
161 afb_ws_disconnect(ws);
164 static void aws_on_hangup(struct afb_ws *ws)
166 afb_ws_disconnect(ws);
169 static inline struct buf aws_pick_buffer(struct afb_ws *ws)
171 struct buf result = ws->buffer;
172 ws->buffer.buffer = NULL;
177 static int aws_read(struct afb_ws *ws, size_t size)
183 buffer = realloc(ws->buffer.buffer, ws->buffer.size + size + 1);
186 ws->buffer.buffer = buffer;
187 sz = websock_read(ws->ws, &buffer[ws->buffer.size], size);
188 if ((size_t)sz != size)
190 ws->buffer.size += size;
195 static void aws_on_close(struct afb_ws *ws, uint16_t code, size_t size)
200 if (ws->itf->on_close == NULL)
201 websock_drop(ws->ws);
204 b = aws_pick_buffer(ws);
205 ws->itf->on_close(ws->closure, code, b.buffer, b.size);
209 static void aws_on_text(struct afb_ws *ws, int last, size_t size)
211 if (ws->type != none) {
212 websock_drop(ws->ws);
213 websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0);
214 } else if (ws->itf->on_text == NULL) {
215 websock_drop(ws->ws);
216 websock_close_code(ws->ws, WEBSOCKET_CODE_CANT_ACCEPT, NULL, 0);
219 aws_on_continue(ws, last, size);
223 static void aws_on_binary(struct afb_ws *ws, int last, size_t size)
225 if (ws->type != none) {
226 websock_drop(ws->ws);
227 websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0);
228 } else if (ws->itf->on_binary == NULL) {
229 websock_drop(ws->ws);
230 websock_close_code(ws->ws, WEBSOCKET_CODE_CANT_ACCEPT, NULL, 0);
233 aws_on_continue(ws, last, size);
237 static void aws_on_continue(struct afb_ws *ws, int last, size_t size)
242 if (ws->type == none) {
243 websock_drop(ws->ws);
244 websock_close_code(ws->ws, WEBSOCKET_CODE_PROTOCOL_ERROR, NULL, 0);
246 if (!aws_read(ws, size)) {
247 aws_on_close(ws, WEBSOCKET_CODE_ABNORMAL, 0);
249 istxt = ws->type == text;
251 b = aws_pick_buffer(ws);
252 b.buffer[b.size] = 0;
253 (istxt ? ws->itf->on_text : ws->itf->on_binary)(ws->closure, b.buffer, b.size);