#include "afb-ws.h"
#include "afb-msg-json.h"
#include "afb-proto-ws.h"
-#include "jobs.h"
#include "fdev.h"
struct afb_proto_ws;
struct client_call {
struct client_call *next; /* the next call */
struct afb_proto_ws *protows; /* the proto_ws */
- void *request;
+ void *request; /* the request closure */
uint32_t callid; /* the message identifier */
};
void (*on_hangup)(void *closure);
/* queuing facility for processing messages */
- int (*queuing)(void (*process)(int s, void *c), void *closure);
+ int (*queuing)(struct afb_proto_ws *proto, void (*process)(int s, void *c), void *closure);
};
/******************* streaming objects **********************************/
binary->rb.head = data;
binary->rb.end = data + size;
if (!protows->queuing
- || protows->queuing(processing, binary) < 0)
+ || protows->queuing(protows, processing, binary) < 0)
processing(0, binary);
return;
}
{
struct afb_proto_ws *protows = closure;
struct client_describe *cd, *ncd;
+ struct client_call *call, *ncall;
+
+ ncd = __atomic_exchange_n(&protows->describes, NULL, __ATOMIC_RELAXED);
+ ncall = __atomic_exchange_n(&protows->calls, NULL, __ATOMIC_RELAXED);
+
+ while (ncall) {
+ call= ncall;
+ ncall = call->next;
+ protows->client_itf->on_reply(protows->closure, call->request, NULL, "disconnected", "server hung up");
+ free(call);
+ }
- ncd = protows->describes;
while (ncd) {
cd= ncd;
ncd = cd->next;
void afb_proto_ws_unref(struct afb_proto_ws *protows)
{
- if (!__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) {
+ if (protows && !__atomic_sub_fetch(&protows->refcount, 1, __ATOMIC_RELAXED)) {
afb_proto_ws_hangup(protows);
afb_ws_destroy(protows->ws);
pthread_mutex_destroy(&protows->mutex);
protows->on_hangup = on_hangup;
}
-void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(void (*)(int,void*), void*))
+void afb_proto_ws_set_queuing(struct afb_proto_ws *protows, int (*queuing)(struct afb_proto_ws*, void (*)(int,void*), void*))
{
protows->queuing = queuing;
}