Add more grpc - Asyncstuff
[src/agl-compositor.git] / clients / main-grpc.cpp
similarity index 66%
rename from clients/grpc.cpp
rename to clients/main-grpc.cpp
index 503453d..5f35c85 100644 (file)
 #include <cstdio>
 #include <ctime>
 #include <algorithm>
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
 
-#include "grpc.h"
+#include "shell.h"
+#include "log.h"
+#include "main-grpc.h"
+#include "grpc-async-cb.h"
 
-struct shell_data {
-       struct wl_display *wl_display;
-       struct agl_shell *shell;
-       struct agl_shell_ext *shell_ext;
-       Shell *aglShell;
-
-       bool wait_for_bound;
-       bool wait_for_doas;
-
-       bool bound_ok;
-       bool doas_ok;
-
-       uint32_t version;
-       struct wl_list output_list;     /** window_output::link */
-};
-
-struct window_output {
-       struct shell_data *shell_data;
-       struct wl_output *output;
-       char *name;
-       struct wl_list link;    /** display::output_list */
-};
-
-static struct shell_data *sh = nullptr;
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
-                            const ::agl_shell_ipc::ActivateRequest* request,
-                            google::protobuf::Empty* /*response*/)
-{
-       fprintf(stderr, "activating app %s on output %s\n",
-                       request->app_id().c_str(),
-                       request->output_name().c_str());
-
-       sh->aglShell->ActivateApp(request->app_id(), request->output_name());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
-                              const ::agl_shell_ipc::DeactivateRequest* request,
-                              google::protobuf::Empty* /*response*/)
-{
-       sh->aglShell->DeactivateApp(request->app_id());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
-                            const ::agl_shell_ipc::FloatRequest* request,
-                            google::protobuf::Empty* /* response */)
-{
-       sh->aglShell->SetAppFloat(request->app_id());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
-           const ::agl_shell_ipc::SplitRequest* request,
-           google::protobuf::Empty* /*response*/)
-{
-       sh->aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-void
-Shell::ActivateApp(const std::string &app_id, const std::string &output_name)
-{
-       struct window_output *woutput, *w_output;
-
-       woutput = nullptr;
-       w_output = nullptr;
-
-       wl_list_for_each(woutput, &sh->output_list, link) {
-               if (woutput->name && !strcmp(woutput->name, output_name.c_str())) {
-                       w_output = woutput;
-                       break;
-               }
-       }
-
-       // else, get the first one available
-       if (!w_output)
-               w_output = wl_container_of(sh->output_list.prev, w_output, link);
-
-       agl_shell_activate_app(this->m_shell.get(), app_id.c_str(), w_output->output);
-       wl_display_flush(sh->wl_display);
-}
-
-void
-Shell::DeactivateApp(const std::string &app_id)
-{
-       (void) app_id;
-}
-
-void
-Shell::SetAppFloat(const std::string &app_id)
-{
-       (void) app_id;
-}
-
-void
-Shell::SetAppSplit(const std::string &app_id, uint32_t orientation)
-{
-       (void) app_id;
-       (void) orientation;
-}
-
-static void
-start_grpc_server(void)
-{
-       // instantiante the grpc server
-       std::string server_address(kDefaultGrpcServiceAddress);
-       GrpcServiceImpl service;
-
-       grpc::EnableDefaultHealthCheckService(true);
-       grpc::reflection::InitProtoReflectionServerBuilderPlugin();
-
-       grpc::ServerBuilder builder;
-       builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
-       builder.RegisterService(&service);
-
-       std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
-       fprintf(stderr, "Server listening on %s\n", server_address.c_str());
-
-       server->Wait();
-}
+static int running = 1;
 
 static void
 agl_shell_bound_ok(void *data, struct agl_shell *agl_shell)
@@ -169,10 +39,24 @@ static void
 agl_shell_app_state(void *data, struct agl_shell *agl_shell,
                const char *app_id, uint32_t state)
 {
-       (void) data;
        (void) agl_shell;
-       (void) app_id;
-       (void) state;
+       struct shell_data *sh = static_cast<struct shell_data *>(data);
+       LOG("got app_state event app_id %s,  state %d\n", app_id, state);
+
+       if (sh->server_context_list.empty())
+               return;
+
+       ::agl_shell_ipc::AppState app;
+
+       sh->current_app_state.set_app_id(std::string(app_id));
+       sh->current_app_state.set_state(state);
+
+       auto start = sh->server_context_list.begin();
+       while (start != sh->server_context_list.end()) {
+               LOG("writing to lister %p\n", static_cast<void *>(start->second));
+               start->second->NextWrite();
+               start++;
+       }
 }
 
 static const struct agl_shell_listener shell_listener = {
@@ -294,7 +178,6 @@ destroy_output(struct window_output *w_output)
        free(w_output);
 }
 
-
 static void
 global_add(void *data, struct wl_registry *reg, uint32_t id,
                const char *interface, uint32_t version)
@@ -340,7 +223,8 @@ global_add_ext(void *data, struct wl_registry *reg, uint32_t id,
                        static_cast<struct agl_shell_ext *>(wl_registry_bind(reg, id,
                                        &agl_shell_ext_interface, std::min(static_cast<uint32_t>(1),
                                                                           version)));
-               agl_shell_ext_add_listener(sh->shell_ext, &shell_ext_listener, data);
+               agl_shell_ext_add_listener(sh->shell_ext,
+                                          &shell_ext_listener, data);
        }
 }
 
@@ -364,7 +248,7 @@ static const struct wl_registry_listener registry_listener = {
 };
 
 static void
-register_shell_ext(struct wl_display *wl_display)
+register_shell_ext(struct wl_display *wl_display, struct shell_data *sh)
 {
        struct wl_registry *registry;
 
@@ -377,7 +261,7 @@ register_shell_ext(struct wl_display *wl_display)
 }
 
 static void
-register_shell(struct wl_display *wl_display)
+register_shell(struct wl_display *wl_display, struct shell_data *sh)
 {
        struct wl_registry *registry;
 
@@ -391,7 +275,21 @@ register_shell(struct wl_display *wl_display)
        wl_registry_destroy(registry);
 }
 
-static int
+static void
+destroy_shell_data(struct shell_data *sh)
+{
+        struct window_output *w_output, *w_output_next;
+
+        wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
+                destroy_output(w_output);
+
+        wl_display_flush(sh->wl_display);
+        wl_display_disconnect(sh->wl_display);
+
+       delete sh;
+}
+
+static struct shell_data *
 start_agl_shell_client(void)
 {
        int ret = 0;
@@ -399,22 +297,25 @@ start_agl_shell_client(void)
 
        wl_display = wl_display_connect(NULL);
 
-       sh = new struct shell_data;
+       struct shell_data *sh = new struct shell_data;
+
        sh->wl_display = wl_display;
        sh->wait_for_doas = true;
        sh->wait_for_bound = true;
 
-       register_shell_ext(wl_display);
+       register_shell_ext(wl_display, sh);
 
        // check for agl_shell_ext
        if (!sh->shell_ext) {
                fprintf(stderr, "Failed to bind to agl_shell_ext interface\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        if (wl_list_empty(&sh->output_list)) {
                fprintf(stderr, "Failed get any outputs!\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        agl_shell_ext_doas_shell_client(sh->shell_ext);
@@ -426,11 +327,12 @@ start_agl_shell_client(void)
 
        if (!sh->doas_ok) {
                fprintf(stderr, "Failed to get doas_done event\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        // bind to agl-shell
-       register_shell(wl_display);
+       register_shell(wl_display, sh);
        while (ret != -1 && sh->wait_for_bound) {
                ret = wl_display_dispatch(sh->wl_display);
                if (sh->wait_for_bound)
@@ -440,34 +342,40 @@ start_agl_shell_client(void)
        // at this point, we can't do anything about it
        if (!sh->bound_ok) {
                fprintf(stderr, "Failed to get bound_ok event!\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n");
-       std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
-       sh->aglShell = new Shell(agl_shell);
 
-       return 0;
+       return sh; 
 }
 
 static void
-destroy_shell_data(void)
+start_grpc_server(Shell *aglShell)
 {
-        struct window_output *w_output, *w_output_next;
+       // instantiante the grpc server
+       std::string server_address(kDefaultGrpcServiceAddress);
+       GrpcServiceImpl service{aglShell};
 
-        wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
-                destroy_output(w_output);
+       grpc::EnableDefaultHealthCheckService(true);
+       grpc::reflection::InitProtoReflectionServerBuilderPlugin();
 
-        wl_display_flush(sh->wl_display);
-        wl_display_disconnect(sh->wl_display);
+       grpc::ServerBuilder builder;
+       builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+       builder.RegisterService(&service);
 
-       delete sh;
+       std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
+       LOG("Server listening on %s\n", server_address.c_str());
+
+       server->Wait();
 }
 
 int main(int argc, char **argv)
 {
        (void) argc;
        (void) argv;
+       Shell *aglShell;
        int ret = 0;
 
        // do not start right up, give shell client time to boot up
@@ -479,14 +387,22 @@ int main(int argc, char **argv)
 
        nanosleep(&ts, NULL);
 
-       ret = start_agl_shell_client();
-       if (ret) {
-               fprintf(stderr, "Failed to initialize agl-shell/agl-shell-ext\n");
+       struct shell_data *sh = start_agl_shell_client();
+       if (!sh) {
+               LOG("Failed to initialize agl-shell/agl-shell-ext\n");
                exit(EXIT_FAILURE);
        }
 
-       start_grpc_server();
+       std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
+       aglShell = new Shell(agl_shell, sh);
+
+       std::thread thread(start_grpc_server, aglShell);
+
+       // serve wayland requests
+       while (running && ret != -1) {
+               ret = wl_display_dispatch(sh->wl_display);
+       }
 
-       destroy_shell_data();
+       destroy_shell_data(sh);
        return 0;
 }