api-dbus: improves events 61/5961/1
authorJosé Bollo <jose.bollo@iot.bzh>
Thu, 23 Jun 2016 09:04:00 +0000 (11:04 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Thu, 23 Jun 2016 09:04:23 +0000 (11:04 +0200)
Change-Id: I0d58bed66ebc9eaea63c0863351d03cf458e4198
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
plugins/samples/HelloWorld.c
src/afb-api-dbus.c

index 259b42f..b6f49b7 100644 (file)
 
 const struct AFB_interface *interface;
 
+struct event
+{
+       struct event *next;
+       struct afb_event event;
+       char tag[1];
+};
+
+static struct event *events = 0;
+
+/* searchs the event of tag */
+static struct event *event_get(const char *tag)
+{
+       struct event *e = events;
+       while(e && strcmp(e->tag, tag))
+               e = e->next;
+       return e;
+}
+
+/* deletes the event of tag */
+static int event_del(const char *tag)
+{
+       struct event *e, **p;
+
+       /* check exists */
+       e = event_get(tag);
+       if (!e) return -1;
+
+       /* unlink */
+       p = &events;
+       while(*p != e) p = &(*p)->next;
+       *p = e->next;
+
+       /* destroys */
+       afb_event_drop(e->event);
+       free(e);
+       return 0;
+}
+
+/* creates the event of tag */
+static int event_add(const char *tag, const char *name)
+{
+       struct event *e;
+
+       /* check valid tag */
+       e = event_get(tag);
+       if (e) return -1;
+
+       /* creation */
+       e = malloc(strlen(tag) + sizeof *e);
+       if (!e) return -1;
+       strcpy(e->tag, tag);
+
+       /* make the event */
+       e->event = afb_daemon_make_event(interface->daemon, name);
+       if (!e->event.closure) { free(e); return -1; }
+
+       /* link */
+       e->next = events;
+       events = e;
+       return 0;
+}
+
+static int event_subscribe(struct afb_req request, const char *tag)
+{
+       struct event *e;
+       e = event_get(tag);
+       return e ? afb_req_subscribe(request, e->event) : -1;
+}
+
+static int event_unsubscribe(struct afb_req request, const char *tag)
+{
+       struct event *e;
+       e = event_get(tag);
+       return e ? afb_req_unsubscribe(request, e->event) : -1;
+}
+
+static int event_push(struct json_object *args, const char *tag)
+{
+       struct event *e;
+       e = event_get(tag);
+       return e ? afb_event_push(e->event, json_object_get(args)) : -1;
+}
+
 // Sample Generic Ping Debug API
 static void ping(struct afb_req request, json_object *jresp, const char *tag)
 {
@@ -99,6 +182,69 @@ static void subcall (struct afb_req request)
                afb_req_subcall(request, api, verb, object, subcallcb, afb_req_store(request));
 }
 
+static void eventadd (struct afb_req request)
+{
+       const char *tag = afb_req_value(request, "tag");
+       const char *name = afb_req_value(request, "name");
+
+       if (tag == NULL || name == NULL)
+               afb_req_fail(request, "failed", "bad arguments");
+       else if (0 != event_add(tag, name))
+               afb_req_fail(request, "failed", "creation error");
+       else
+               afb_req_success(request, NULL, NULL);
+}
+
+static void eventdel (struct afb_req request)
+{
+       const char *tag = afb_req_value(request, "tag");
+
+       if (tag == NULL)
+               afb_req_fail(request, "failed", "bad arguments");
+       else if (0 != event_del(tag))
+               afb_req_fail(request, "failed", "deletion error");
+       else
+               afb_req_success(request, NULL, NULL);
+}
+
+static void eventsub (struct afb_req request)
+{
+       const char *tag = afb_req_value(request, "tag");
+
+       if (tag == NULL)
+               afb_req_fail(request, "failed", "bad arguments");
+       else if (0 != event_subscribe(request, tag))
+               afb_req_fail(request, "failed", "subscription error");
+       else
+               afb_req_success(request, NULL, NULL);
+}
+
+static void eventunsub (struct afb_req request)
+{
+       const char *tag = afb_req_value(request, "tag");
+
+       if (tag == NULL)
+               afb_req_fail(request, "failed", "bad arguments");
+       else if (0 != event_unsubscribe(request, tag))
+               afb_req_fail(request, "failed", "unsubscription error");
+       else
+               afb_req_success(request, NULL, NULL);
+}
+
+static void eventpush (struct afb_req request)
+{
+       const char *tag = afb_req_value(request, "tag");
+       const char *data = afb_req_value(request, "data");
+       json_object *object = data ? json_tokener_parse(data) : NULL;
+
+       if (tag == NULL)
+               afb_req_fail(request, "failed", "bad arguments");
+       else if (0 > event_push(object, tag))
+               afb_req_fail(request, "failed", "push error");
+       else
+               afb_req_success(request, NULL, NULL);
+}
+
 // NOTE: this sample does not use session to keep test a basic as possible
 //       in real application most APIs should be protected with AFB_SESSION_CHECK
 static const struct AFB_verb_desc_v1 verbs[]= {
@@ -109,6 +255,11 @@ static const struct AFB_verb_desc_v1 verbs[]= {
   {"pingJson" , AFB_SESSION_NONE, pingJson    , "Return a JSON object"},
   {"pingevent", AFB_SESSION_NONE, pingEvent   , "Send an event"},
   {"subcall",   AFB_SESSION_NONE, subcall     , "Call api/verb(args)"},
+  {"eventadd",  AFB_SESSION_NONE, eventadd    , "adds the event of 'name' for the 'tag'"},
+  {"eventdel",  AFB_SESSION_NONE, eventdel    , "deletes the event of 'tag'"},
+  {"eventsub",  AFB_SESSION_NONE, eventsub    , "subscribes to the event of 'tag'"},
+  {"eventunsub",AFB_SESSION_NONE, eventunsub  , "unsubscribes to the event of 'tag'"},
+  {"eventpush", AFB_SESSION_NONE, eventpush   , "pushs the event of 'tag' with the 'data'"},
   {NULL}
 };
 
index 054755a..4c5d908 100644 (file)
 
 static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
 
+struct dbus_memo;
+struct dbus_event;
+struct destination;
+
 /*
  * The path given are of the form
  *     system:/org/agl/afb/api/...
@@ -51,11 +55,22 @@ static const char DEFAULT_PATH_PREFIX[] = "/org/agl/afb/api/";
 struct api_dbus
 {
        struct sd_bus *sdbus;   /* the bus */
-       struct sd_bus_slot *slot; /* the slot */
        char *path;             /* path of the object for the API */
        char *name;             /* name/interface of the object */
        char *api;              /* api name of the interface */
-       struct afb_evt_listener *listener;
+       union {
+               struct {
+                       struct sd_bus_slot *slot_broadcast;
+                       struct sd_bus_slot *slot_event;
+                       struct dbus_event *events;
+                       struct dbus_memo *memos;
+               } client;
+               struct {
+                       struct sd_bus_slot *slot_call;
+                       struct afb_evt_listener *listener; /* listener for broadcasted events */
+                       struct destination *destinations;
+               } server;
+       };
 };
 
 #define RETOK   1
@@ -200,12 +215,23 @@ static void destroy_api_dbus(struct api_dbus *api)
  * structure for recording query data
  */
 struct dbus_memo {
+       struct dbus_memo *next;         /* the next memo */
+       struct api_dbus *api;           /* the dbus api */
        struct afb_req req;             /* the request handle */
        struct afb_context *context;    /* the context of the query */
+       uint64_t msgid;                 /* the message identifier */
+};
+
+struct dbus_event
+{
+       struct dbus_event *next;
+       struct afb_event event;
+       int id;
+       int refcount;
 };
 
 /* allocates and init the memorizing data */
-static struct dbus_memo *api_dbus_client_make_memo(struct afb_req req, struct afb_context *context)
+static struct dbus_memo *api_dbus_client_memo_make(struct api_dbus *api, struct afb_req req, struct afb_context *context)
 {
        struct dbus_memo *memo;
 
@@ -214,17 +240,44 @@ static struct dbus_memo *api_dbus_client_make_memo(struct afb_req req, struct af
                afb_req_addref(req);
                memo->req = req;
                memo->context = context;
+               memo->msgid = 0;
+               memo->api = api;
+               memo->next = api->client.memos;
+               api->client.memos = memo;
        }
        return memo;
 }
 
 /* free and release the memorizing data */
-static void api_dbus_client_free_memo(struct dbus_memo *memo)
+static void api_dbus_client_memo_destroy(struct dbus_memo *memo)
 {
+       struct dbus_memo **prv;
+
+       prv = &memo->api->client.memos;
+       while (*prv != NULL) {
+               if (*prv == memo) {
+                       *prv = memo->next;
+                       break;
+               }
+               prv = &(*prv)->next;
+       }
+
        afb_req_unref(memo->req);
        free(memo);
 }
 
+/* search a memorized request */
+static struct dbus_memo *api_dbus_client_memo_search(struct api_dbus *api, uint64_t msgid)
+{
+       struct dbus_memo *memo;
+
+       memo = api->client.memos;
+       while (memo != NULL && memo->msgid != msgid)
+               memo = memo->next;
+
+       return memo;
+}
+
 /* callback when received answer */
 static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_bus_error *ret_error)
 {
@@ -247,10 +300,10 @@ static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_
                memo->context->flags = (unsigned)flags;
                switch(type) {
                case RETOK:
-                       afb_req_success(memo->req, json_tokener_parse(first), second);
+                       afb_req_success(memo->req, json_tokener_parse(first), *second ? second : NULL);
                        break;
                case RETERR:
-                       afb_req_fail(memo->req, first, second);
+                       afb_req_fail(memo->req, first, *second ? second : NULL);
                        break;
                case RETRAW:
                        afb_req_send(memo->req, first, strlen(first));
@@ -260,7 +313,7 @@ static int api_dbus_client_on_reply(sd_bus_message *message, void *userdata, sd_
                        break;
                }
        }
-       api_dbus_client_free_memo(memo);
+       api_dbus_client_memo_destroy(memo);
        return 1;
 }
 
@@ -271,29 +324,44 @@ static void api_dbus_client_call(struct api_dbus *api, struct afb_req req, struc
        int rc;
        char *method = strndupa(verb, lenverb);
        struct dbus_memo *memo;
+       struct sd_bus_message *msg;
 
        /* create the recording data */
-       memo = api_dbus_client_make_memo(req, context);
+       memo = api_dbus_client_memo_make(api, req, context);
        if (memo == NULL) {
                afb_req_fail(req, "error", "out of memory");
                return;
        }
 
-       /* makes the call */
-       rc = sd_bus_call_method_async(api->sdbus, NULL,
-               api->name, api->path, api->name, method,
-               api_dbus_client_on_reply, memo,
-               "ssu",
+       /* creates the message */
+       msg = NULL;
+       rc = sd_bus_message_new_method_call(api->sdbus, &msg, api->name, api->path, api->name, method);
+       if (rc < 0)
+               goto error;
+
+       rc = sd_bus_message_append(msg, "ssu",
                        afb_req_raw(req, &size),
                        ctxClientGetUuid(context->session),
                        (uint32_t)context->flags);
+       if (rc < 0)
+               goto error;
+
+       /* makes the call */
+       rc = sd_bus_call_async(api->sdbus, NULL, msg, api_dbus_client_on_reply, memo, (uint64_t)-1);
+       if (rc < 0)
+               goto error;
+
+       rc = sd_bus_message_get_cookie(msg, &memo->msgid);
+       if (rc >= 0)
+               goto end;
 
+error:
        /* if there was an error report it directly */
-       if (rc < 0) {
-               errno = -rc;
-               afb_req_fail(req, "error", "dbus error");
-               api_dbus_client_free_memo(memo);
-       }
+       errno = -rc;
+       afb_req_fail(req, "error", "dbus error");
+       api_dbus_client_memo_destroy(memo);
+end:
+       sd_bus_message_unref(msg);
 }
 
 static int api_dbus_service_start(struct api_dbus *api, int share_session, int onneed)
@@ -307,18 +375,208 @@ static int api_dbus_service_start(struct api_dbus *api, int share_session, int o
        return -1;
 }
 
-/* receives events */
-static int api_dbus_client_on_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
+/* receives broadcasted events */
+static int api_dbus_client_on_broadcast_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
 {
        struct json_object *object;
        const char *event, *data;
        int rc = sd_bus_message_read(m, "ss", &event, &data);
        if (rc < 0)
-               ERROR("unreadable event");
+               ERROR("unreadable broadcasted event");
        else {
                object = json_tokener_parse(data);
                afb_evt_broadcast(event, object);
-               json_object_put(object);
+       }
+       return 1;
+}
+
+/* search the event */
+static struct dbus_event *api_dbus_client_event_search(struct api_dbus *api, int id, const char *name)
+{
+       struct dbus_event *ev;
+
+       ev = api->client.events;
+       while (ev != NULL && (ev->id != id || 0 != strcmp(afb_evt_event_name(ev->event), name)))
+               ev = ev->next;
+
+       return ev;
+}
+
+/* adds an event */
+static void api_dbus_client_event_create(struct api_dbus *api, int id, const char *name)
+{
+       struct dbus_event *ev;
+
+       /* check conflicts */
+       ev = api_dbus_client_event_search(api, id, name);
+       if (ev != NULL) {
+               ev->refcount++;
+               return;
+       }
+
+       /* no conflict, try to add it */
+       ev = malloc(sizeof *ev);
+       if (ev != NULL) {
+               ev->event = afb_evt_create_event(name);
+               if (ev->event.closure == NULL)
+                       free(ev);
+               else {
+                       ev->refcount = 1;
+                       ev->id = id;
+                       ev->next = api->client.events;
+                       api->client.events = ev;
+                       return;
+               }
+       }
+       ERROR("can't create event %s, out of memory", name);
+}
+
+/* removes an event */
+static void api_dbus_client_event_drop(struct api_dbus *api, int id, const char *name)
+{
+       struct dbus_event *ev, **prv;
+
+       /* retrieves the event */
+       ev = api_dbus_client_event_search(api, id, name);
+       if (ev == NULL) {
+               ERROR("event %s not found", name);
+               return;
+       }
+
+       /* decrease the reference count */
+       if (--ev->refcount)
+               return;
+
+       /* unlinks the event */
+       prv = &api->client.events;
+       while (*prv != ev)
+               prv = &(*prv)->next;
+       *prv = ev->next;
+
+       /* destroys the event */
+       afb_event_drop(ev->event);
+       free(ev);
+}
+
+/* pushs an event */
+static void api_dbus_client_event_push(struct api_dbus *api, int id, const char *name, const char *data)
+{
+       struct json_object *object;
+       struct dbus_event *ev;
+
+       /* retrieves the event */
+       ev = api_dbus_client_event_search(api, id, name);
+       if (ev == NULL) {
+               ERROR("event %s not found", name);
+               return;
+       }
+
+       /* destroys the event */
+       object = json_tokener_parse(data);
+       afb_event_push(ev->event, object);
+}
+
+/* subscribes an event */
+static void api_dbus_client_event_subscribe(struct api_dbus *api, int id, const char *name, uint64_t msgid)
+{
+       int rc;
+       struct dbus_event *ev;
+       struct dbus_memo *memo;
+
+       /* retrieves the event */
+       ev = api_dbus_client_event_search(api, id, name);
+       if (ev == NULL) {
+               ERROR("event %s not found", name);
+               return;
+       }
+
+       /* retrieves the memo */
+       memo = api_dbus_client_memo_search(api, msgid);
+       if (memo == NULL) {
+               ERROR("message not found");
+               return;
+       }
+
+       /* subscribe the request to the event */
+       rc = afb_req_subscribe(memo->req, ev->event);
+       if (rc < 0)
+               ERROR("can't subscribe: %m");
+}
+
+/* unsubscribes an event */
+static void api_dbus_client_event_unsubscribe(struct api_dbus *api, int id, const char *name, uint64_t msgid)
+{
+       int rc;
+       struct dbus_event *ev;
+       struct dbus_memo *memo;
+
+       /* retrieves the event */
+       ev = api_dbus_client_event_search(api, id, name);
+       if (ev == NULL) {
+               ERROR("event %s not found", name);
+               return;
+       }
+
+       /* retrieves the memo */
+       memo = api_dbus_client_memo_search(api, msgid);
+       if (memo == NULL) {
+               ERROR("message not found");
+               return;
+       }
+
+       /* unsubscribe the request from the event */
+       rc = afb_req_unsubscribe(memo->req, ev->event);
+       if (rc < 0)
+               ERROR("can't unsubscribe: %m");
+}
+
+/* receives calls for event */
+static int api_dbus_client_on_manage_event(sd_bus_message *m, void *userdata, sd_bus_error *ret_error)
+{
+       const char *eventname, *data;
+       int rc;
+       int32_t eventid;
+       uint8_t order;
+       struct api_dbus *api;
+       uint64_t msgid;
+
+       /* check if expected message */
+       api = userdata;
+       if (0 != strcmp(api->name, sd_bus_message_get_interface(m)))
+               return 0; /* not the expected interface */
+       if (0 != strcmp("event", sd_bus_message_get_member(m)))
+               return 0; /* not the expected member */
+       if (sd_bus_message_get_expect_reply(m))
+               return 0; /* not the expected type of message */
+
+       /* reads the message */
+       rc = sd_bus_message_read(m, "yisst", &order, &eventid, &eventname, &data, &msgid);
+       if (rc < 0) {
+               ERROR("unreadable event");
+               return 1;
+       }
+
+       /* what is the order ? */
+       switch ((char)order) {
+       case '+': /* creates the event */
+               api_dbus_client_event_create(api, eventid, eventname);
+               break;
+       case '-': /* drops the event */
+               api_dbus_client_event_drop(api, eventid, eventname);
+               break;
+       case '!': /* pushs the event */
+               api_dbus_client_event_push(api, eventid, eventname, data);
+               break;
+       case 'S': /* subscribe event for a request */
+               api_dbus_client_event_subscribe(api, eventid, eventname, msgid);
+               break;
+       case 'U': /* unsubscribe event for a request */
+               api_dbus_client_event_unsubscribe(api, eventid, eventname, msgid);
+               break;
+       default:
+               /* unexpected order */
+               ERROR("unexpected order '%c' received", (char)order);
+               break;
        }
        return 1;
 }
@@ -336,15 +594,23 @@ int afb_api_dbus_add_client(const char *path)
        if (api == NULL)
                goto error;
 
-       /* connect to events */
-       rc = asprintf(&match, "type='signal',path='%s',interface='%s',member='event'", api->path, api->name);
+       /* connect to broadcasted events */
+       rc = asprintf(&match, "type='signal',path='%s',interface='%s',member='broadcast'", api->path, api->name);
        if (rc < 0) {
                errno = ENOMEM;
                ERROR("out of memory");
                goto error;
        }
-       rc = sd_bus_add_match(api->sdbus, &api->slot, match, api_dbus_client_on_event, api);
+       rc = sd_bus_add_match(api->sdbus, &api->client.slot_broadcast, match, api_dbus_client_on_broadcast_event, api);
        free(match);
+       if (rc < 0) {
+               errno = -rc;
+               ERROR("can't add dbus match %s for %s", api->path, api->name);
+               goto error;
+       }
+
+       /* connect to event management */
+       rc = sd_bus_add_object(api->sdbus, &api->client.slot_event, api->path, api_dbus_client_on_manage_event, api);
        if (rc < 0) {
                errno = -rc;
                ERROR("can't add dbus object %s for %s", api->path, api->name);
@@ -366,6 +632,137 @@ error:
        return -1;
 }
 
+/******************* event structures for server part **********************************/
+
+static void afb_api_dbus_server_event_add(void *closure, const char *event, int eventid);
+static void afb_api_dbus_server_event_remove(void *closure, const char *event, int eventid);
+static void afb_api_dbus_server_event_push(void *closure, const char *event, int eventid, struct json_object *object);
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object);
+
+/* the interface for events broadcasting */
+static const struct afb_evt_itf evt_broadcast_itf = {
+       .broadcast = afb_api_dbus_server_event_broadcast,
+};
+
+/* the interface for events pushing */
+static const struct afb_evt_itf evt_push_itf = {
+       .push = afb_api_dbus_server_event_push,
+       .add = afb_api_dbus_server_event_add,
+       .remove = afb_api_dbus_server_event_remove
+};
+
+/******************* destination description part for server *****************************/
+
+struct destination
+{
+       /* link to next different destination */
+       struct destination *next;
+
+       /* the server dbus-api */
+       struct api_dbus *api;
+
+       /* count of references */
+       int refcount;
+
+       /* the destination */
+       char name[1];
+};
+
+static struct destination *afb_api_dbus_server_destination_get(struct api_dbus *api, const char *sender)
+{
+       struct destination *destination;
+
+       /* searchs for an existing destination */
+       destination = api->server.destinations;
+       while (destination != NULL) {
+               if (0 == strcmp(destination->name, sender)) {
+                       destination->refcount++;
+                       return destination;
+               }
+               destination = destination->next;
+       }
+
+       /* not found, create it */
+       destination = malloc(strlen(sender) + sizeof *destination);
+       if (destination == NULL)
+               errno = ENOMEM;
+       else {
+               destination->api = api;
+               destination->refcount = 1;
+               strcpy(destination->name, sender);
+               destination->next = api->server.destinations;
+               api->server.destinations = destination;
+       }
+       return destination;
+}
+
+static void afb_api_dbus_server_destination_unref(struct destination *destination)
+{
+       if (!--destination->refcount) {
+               struct destination **prv;
+
+               prv = &destination->api->server.destinations;
+               while(*prv != destination)
+                       prv = &(*prv)->next;
+               *prv = destination->next;
+               free(destination);
+       }
+}
+
+struct listener
+{
+       /* link to next different destination */
+       struct destination *destination;
+
+       /* the listener of events */
+       struct afb_evt_listener *listener;
+};
+
+static void afb_api_dbus_server_listener_free(struct listener *listener)
+{
+       afb_evt_listener_unref(listener->listener);
+       afb_api_dbus_server_destination_unref(listener->destination);
+       free(listener);
+}
+
+static struct listener *afb_api_dbus_server_listerner_get(struct api_dbus *api, const char *sender, struct AFB_clientCtx *session)
+{
+       int rc;
+       struct listener *listener;
+       struct destination *destination;
+
+       /* get the destination */
+       destination = afb_api_dbus_server_destination_get(api, sender);
+       if (destination == NULL)
+               return NULL;
+
+       /* retrieves the stored listener */
+       listener = ctxClientCookieGet(session, destination);
+       if (listener != NULL) {
+               /* found */
+               afb_api_dbus_server_destination_unref(destination);
+               return listener;
+       }
+
+       /* creates the listener */
+       listener = malloc(sizeof *listener);
+       if (listener == NULL)
+               errno = ENOMEM;
+       else {
+               listener->destination = destination;
+               listener->listener = afb_evt_listener_create(&evt_push_itf, destination);
+               if (listener->listener != NULL) {
+                       rc = ctxClientCookieSet(session, destination, listener, (void*)afb_api_dbus_server_listener_free);
+                       if (rc == 0)
+                               return listener;
+                       afb_evt_listener_unref(listener->listener);
+               }
+               free(listener);
+       }
+       afb_api_dbus_server_destination_unref(destination);
+       return NULL;
+}
+
 /******************* dbus request part for server *****************/
 
 /*
@@ -376,6 +773,7 @@ struct dbus_req {
        sd_bus_message *message;        /* the incoming request message */
        const char *request;            /* the readen request as string */
        struct json_object *json;       /* the readen request as object */
+       struct listener *listener;      /* the listener for events */
        int refcount;                   /* reference count of the request */
 };
 
@@ -420,7 +818,7 @@ static void dbus_req_reply(struct dbus_req *dreq, uint8_t type, const char *firs
 {
        int rc;
        rc = sd_bus_reply_method_return(dreq->message,
-                       "yssu", type, first, second, (uint32_t)dreq->context.flags);
+                       "yssu", type, first ? : "", second ? : "", (uint32_t)dreq->context.flags);
        if (rc < 0)
                ERROR("sending the reply failed");
 }
@@ -448,14 +846,28 @@ static void dbus_req_send(struct dbus_req *dreq, const char *buffer, size_t size
        dbus_req_reply(dreq, RETRAW, buffer, "");
 }
 
+static void afb_api_dbus_server_event_send(struct destination *destination, char order, const char *event, int eventid, const char *data, uint64_t msgid);
+
 static int dbus_req_subscribe(struct dbus_req *dreq, struct afb_event event)
 {
-       return -1;
+       uint64_t msgid;
+       int rc;
+
+       rc = afb_evt_add_watch(dreq->listener->listener, event);
+       sd_bus_message_get_cookie(dreq->message, &msgid);
+       afb_api_dbus_server_event_send(dreq->listener->destination, 'S', afb_evt_event_name(event), afb_evt_event_id(event), "", msgid);
+       return rc;
 }
 
 static int dbus_req_unsubscribe(struct dbus_req *dreq, struct afb_event event)
 {
-       return -1;
+       uint64_t msgid;
+       int rc;
+
+       sd_bus_message_get_cookie(dreq->message, &msgid);
+       afb_api_dbus_server_event_send(dreq->listener->destination, 'U', afb_evt_event_name(event), afb_evt_event_id(event), "", msgid);
+       rc = afb_evt_remove_watch(dreq->listener->listener, event);
+       return rc;
 }
 
 static void dbus_req_subcall(struct dbus_req *dreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure);
@@ -485,6 +897,62 @@ static void dbus_req_subcall(struct dbus_req *dreq, const char *api, const char
 
 /******************* server part **********************************/
 
+static void afb_api_dbus_server_event_send(struct destination *destination, char order, const char *event, int eventid, const char *data, uint64_t msgid)
+{
+       int rc;
+       struct api_dbus *api;
+       struct sd_bus_message *msg;
+
+       api = destination->api;
+       msg = NULL;
+
+       rc = sd_bus_message_new_method_call(api->sdbus, &msg, destination->name, api->path, api->name, "event");
+       if (rc < 0)
+               goto error;
+
+       rc = sd_bus_message_append(msg, "yisst", (uint8_t)order, (int32_t)eventid, event, data, msgid);
+       if (rc < 0)
+               goto error;
+
+       rc = sd_bus_send(api->sdbus, msg, NULL); /* NULL for cookie implies no expected reply */
+       if (rc >= 0)
+               goto end;
+
+error:
+       ERROR("error while send event %c%s(%d) to %s", order, event, eventid, destination->name);
+end:
+       sd_bus_message_unref(msg);
+}
+
+static void afb_api_dbus_server_event_add(void *closure, const char *event, int eventid)
+{
+       afb_api_dbus_server_event_send(closure, '+', event, eventid, "", 0);
+}
+
+static void afb_api_dbus_server_event_remove(void *closure, const char *event, int eventid)
+{
+       afb_api_dbus_server_event_send(closure, '-', event, eventid, "", 0);
+}
+
+static void afb_api_dbus_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
+{
+       const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
+       afb_api_dbus_server_event_send(closure, '!', event, eventid, data, 0);
+}
+
+static void afb_api_dbus_server_event_broadcast(void *closure, const char *event, int eventid, struct json_object *object)
+{
+       int rc;
+       struct api_dbus *api;
+
+       api = closure;
+       rc = sd_bus_emit_signal(api->sdbus, api->path, api->name, "broadcast",
+                       "ss", event, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN));
+       if (rc < 0)
+               ERROR("error while broadcasting event %s", event);
+       json_object_put(object);
+}
+
 /* called when the object for the service is called */
 static int api_dbus_server_on_object_called(sd_bus_message *message, void *userdata, sd_bus_error *ret_error)
 {
@@ -495,6 +963,8 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
        struct api_dbus *api = userdata;
        struct afb_req areq;
        uint32_t flags;
+       struct AFB_clientCtx *session;
+       struct listener *listener;
 
        /* check the interface */
        if (strcmp(sd_bus_message_get_interface(message), api->name) != 0)
@@ -505,55 +975,45 @@ static int api_dbus_server_on_object_called(sd_bus_message *message, void *userd
 
        /* create the request */
        dreq = calloc(1 , sizeof *dreq);
-       if (dreq == NULL) {
-               sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
-               return 1;
-       }
+       if (dreq == NULL)
+               goto out_of_memory;
 
        /* get the data */
        rc = sd_bus_message_read(message, "ssu", &dreq->request, &uuid, &flags);
        if (rc < 0) {
                sd_bus_reply_method_errorf(message, SD_BUS_ERROR_INVALID_SIGNATURE, "invalid signature");
-               free(dreq);
-               return 1;
+               goto error;
        }
 
        /* connect to the context */
-       if (afb_context_connect(&dreq->context, uuid, NULL) < 0) {
-               sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
-               free(dreq);
-               return 1;
-       }
+       if (afb_context_connect(&dreq->context, uuid, NULL) < 0)
+               goto out_of_memory;
+       session = dreq->context.session;
+
+       /* get the listener */
+       listener = afb_api_dbus_server_listerner_get(api, sd_bus_message_get_sender(message), session);
+       if (listener == NULL)
+               goto out_of_memory;
 
        /* fulfill the request and emit it */
        dreq->context.flags = flags;
        dreq->message = sd_bus_message_ref(message);
        dreq->json = NULL;
+       dreq->listener = listener;
        dreq->refcount = 1;
        areq.itf = &afb_api_dbus_req_itf;
        areq.closure = dreq;
        afb_apis_call_(areq, &dreq->context, api->api, method);
        dbus_req_unref(dreq);
        return 1;
-}
 
-static void afb_api_dbus_server_send_event(struct api_dbus *api, const char *event, int eventid, struct json_object *object)
-{
-       int rc;
-
-       rc = sd_bus_emit_signal(api->sdbus, api->path, api->name,
-                       "event", "ss", event, json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN));
-       if (rc < 0)
-               ERROR("error while emiting event %s", event);
-       json_object_put(object);
+out_of_memory:
+       sd_bus_reply_method_errorf(message, SD_BUS_ERROR_NO_MEMORY, "out of memory");
+error:
+       free(dreq);
+       return 1;
 }
 
-/* the interface for events */
-static const struct afb_evt_itf evt_itf = {
-       .broadcast = (void*)afb_api_dbus_server_send_event,
-       .push = (void*)afb_api_dbus_server_send_event
-};
-
 /* create the service */
 int afb_api_dbus_add_server(const char *path)
 {
@@ -574,7 +1034,7 @@ int afb_api_dbus_add_server(const char *path)
        }
 
        /* connect the service to the dbus object */
-       rc = sd_bus_add_object(api->sdbus, &api->slot, api->path, api_dbus_server_on_object_called, api);
+       rc = sd_bus_add_object(api->sdbus, &api->server.slot_call, api->path, api_dbus_server_on_object_called, api);
        if (rc < 0) {
                errno = -rc;
                ERROR("can't add dbus object %s for %s", api->path, api->name);
@@ -582,8 +1042,7 @@ int afb_api_dbus_add_server(const char *path)
        }
        INFO("afb service over dbus installed, name %s, path %s", api->name, api->path);
 
-       api->listener = afb_evt_listener_create(&evt_itf, api);
-
+       api->server.listener = afb_evt_listener_create(&evt_broadcast_itf, api);
        return 0;
 error3:
        sd_bus_release_name(api->sdbus, api->name);