return -1;
}
+/*
+ * Avoids the 'listener' to watch 'eventid'
+ * Returns 0 in case of success or else -1.
+ */
+int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid)
+{
+ struct afb_evt_watch *watch;
+ struct afb_evtid *evtid;
+
+ /* search the existing watch */
+ pthread_rwlock_wrlock(&listener->rwlock);
+ watch = listener->watchs;
+ while(watch != NULL) {
+ evtid = watch->evtid;
+ if (evtid->id == eventid) {
+ if (watch->activity != 0) {
+ watch->activity--;
+ if (watch->activity == 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+ return 0;
+ }
+ watch = watch->next_by_listener;
+ }
+ pthread_rwlock_unlock(&listener->rwlock);
+ errno = ENOENT;
+ return -1;
+}
+
#if WITH_AFB_HOOK
/*
* update the hooks for events
extern int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid);
extern int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid);
+extern int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid);
extern struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname);
extern int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj);
extern int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object);
extern void afb_evt_update_hooks();
-#endif
\ No newline at end of file
+#endif
- push or broadcast data as an event
+ - signal unexpected event
+
*/
/************** constants for protocol definition *************************/
#define CHAR_FOR_EVT_PUSH 'P' /* server -> client */
#define CHAR_FOR_EVT_SUBSCRIBE 'X' /* server -> client */
#define CHAR_FOR_EVT_UNSUBSCRIBE 'x' /* server -> client */
+#define CHAR_FOR_EVT_UNEXPECTED 'U' /* client -> server */
#define CHAR_FOR_DESCRIBE 'D' /* client -> server */
#define CHAR_FOR_DESCRIPTION 'd' /* server -> client */
#define CHAR_FOR_TOKEN_ADD 'T' /* client -> server */
return client_send_cmd_id16_optstr(protows, CHAR_FOR_TOKEN_DROP, tokenid, NULL);
}
+int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid)
+{
+ return client_send_cmd_id16_optstr(protows, CHAR_FOR_EVT_UNEXPECTED, eventid, NULL);
+}
+
int afb_proto_ws_client_call(
struct afb_proto_ws *protows,
const char *verb,
protows->server_itf->on_token_remove(protows->closure, tokenid);
}
+static void server_on_event_unexpected(struct afb_proto_ws *protows, struct readbuf *rb)
+{
+ uint16_t eventid;
+
+ if (readbuf_uint16(rb, &eventid))
+ protows->server_itf->on_event_unexpected(protows->closure, eventid);
+}
+
/* on version offer */
static void server_on_version_offer(struct afb_proto_ws *protows, struct readbuf *rb)
{
case CHAR_FOR_TOKEN_DROP:
server_on_token_drop(binary->protows, &binary->rb);
break;
+ case CHAR_FOR_EVT_UNEXPECTED:
+ server_on_event_unexpected(binary->protows, &binary->rb);
+ break;
case CHAR_FOR_VERSION_OFFER:
server_on_version_offer(binary->protows, &binary->rb);
break;
void (*on_token_remove)(void *closure, uint16_t tokenid);
void (*on_call)(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds);
void (*on_describe)(void *closure, struct afb_proto_ws_describe *describe);
+ void (*on_event_unexpected)(void *closure, uint16_t eventid);
};
extern struct afb_proto_ws *afb_proto_ws_create_client(struct fdev *fdev, const struct afb_proto_ws_client_itf *itf, void *closure);
extern int afb_proto_ws_client_token_remove(struct afb_proto_ws *protows, uint16_t tokenid);
extern int afb_proto_ws_client_call(struct afb_proto_ws *protows, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, void *request, const char *user_creds);
extern int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(void*, struct json_object*), void *closure);
+extern int afb_proto_ws_client_event_unexpected(struct afb_proto_ws *protows, uint16_t eventid);
extern int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, uint16_t event_id, const char *event_name);
extern int afb_proto_ws_server_event_remove(struct afb_proto_ws *protows, uint16_t event_id);
rc = u16id2ptr_get(stubws->event_proxies, event_id, (void**)&event);
if (rc >= 0 && event)
- afb_evt_event_x2_push(event, data);
+ rc = afb_evt_event_x2_push(event, data);
else
ERROR("unreadable push event");
+ if (rc <= 0)
+ afb_proto_ws_client_event_unexpected(stubws->proto, event_id);
}
static void client_on_event_broadcast_cb(void *closure, const char *event_name, struct json_object *data, const uuid_binary_t uuid, uint8_t hop)
afb_token_unref(token);
}
+static void server_on_event_unexpected_cb(void *closure, uint16_t eventid)
+{
+ struct afb_stub_ws *stubws = closure;
+
+ afb_evt_watch_sub_eventid(stubws->listener, eventid);
+}
+
static void server_on_call_cb(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, uint16_t sessionid, uint16_t tokenid, const char *user_creds)
{
struct afb_stub_ws *stubws = closure;
.on_token_create = server_on_token_create_cb,
.on_token_remove = server_on_token_remove_cb,
.on_call = server_on_call_cb,
- .on_describe = server_on_describe_cb
+ .on_describe = server_on_describe_cb,
+ .on_event_unexpected = server_on_event_unexpected_cb
};
/* the interface for events pushing */