Add more grpc - Asyncstuff sandbox/mvlad/switch-to-grpc
authorMarius Vlad <marius.vlad@collabora.com>
Tue, 18 Oct 2022 17:22:47 +0000 (20:22 +0300)
committerMarius Vlad <marius.vlad@collabora.com>
Tue, 25 Oct 2022 08:55:44 +0000 (11:55 +0300)
Switch to a more better structure

protocol: Add support for sending out app_state events over gRPC

Signed-off-by: Marius Vlad <marius.vlad@collabora.com>
Change-Id: I2765d53a2123be0d52225d92c964d39c63ec4902

16 files changed:
clients/grpc-async-cb.cpp [new file with mode: 0644]
clients/grpc-async-cb.h [new file with mode: 0644]
clients/grpc-async.cpp [new file with mode: 0644]
clients/grpc-async.h [new file with mode: 0644]
clients/grpc-sync.cpp [new file with mode: 0644]
clients/grpc-sync.h [new file with mode: 0644]
clients/grpc.h [deleted file]
clients/log.h [new file with mode: 0644]
clients/main-grpc.cpp [moved from clients/grpc.cpp with 66% similarity]
clients/main-grpc.h [new file with mode: 0644]
clients/meson.build
clients/shell.cpp [new file with mode: 0644]
clients/shell.h [new file with mode: 0644]
protocol/agl_shell.proto
src/ivi-compositor.h
src/shell.c

diff --git a/clients/grpc-async-cb.cpp b/clients/grpc-async-cb.cpp
new file mode 100644 (file)
index 0000000..71cd57e
--- /dev/null
@@ -0,0 +1,111 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "log.h"
+#include "agl_shell.grpc.pb.h"
+#include "grpc-async-cb.h"
+
+Lister::Lister(Shell *shell) : m_shell(shell)
+{
+       // don't call NextWrite() just yet we do it explicitly when getting
+       // the events from the compositor
+}
+
+void
+Lister::OnDone()
+{
+       delete this;
+}
+
+void Lister::OnWriteDone(bool ok)
+{
+       LOG("ok %d\n", ok);
+       if (ok) {
+               // normally we should finish here, but we don't do that to keep
+               // the channel open
+               //Finish(grpc::Status::OK);
+       }
+}
+
+void 
+Lister::NextWrite(void)
+{
+       // we're going to have a Lister instance per client so we're safe here
+       StartWrite(&m_shell->m_shell_data->current_app_state);
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
+                            const ::agl_shell_ipc::ActivateRequest* request,
+                            google::protobuf::Empty* /*response*/)
+{
+       LOG("activating app %s on output %s\n", request->app_id().c_str(),
+                                               request->output_name().c_str());
+
+       m_aglShell->ActivateApp(request->app_id(), request->output_name());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
+                              const ::agl_shell_ipc::DeactivateRequest* request,
+                              google::protobuf::Empty* /*response*/)
+{
+       m_aglShell->DeactivateApp(request->app_id());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
+                            const ::agl_shell_ipc::FloatRequest* request,
+                            google::protobuf::Empty* /* response */)
+{
+       m_aglShell->SetAppFloat(request->app_id());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
+           const ::agl_shell_ipc::SplitRequest* request,
+           google::protobuf::Empty* /*response*/)
+{
+       m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerWriteReactor<::agl_shell_ipc::AppState>*
+GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext* context,
+                                const google::protobuf::Empty*)
+{
+
+       Lister *n = new Lister(m_aglShell);
+
+       m_aglShell->m_shell_data->server_context_list.push_back(std::pair(context, n));
+       LOG("added lister %p\n", static_cast<void *>(n));
+
+       // just return  a Lister to keep the channel open
+       return n;
+}
diff --git a/clients/grpc-async-cb.h b/clients/grpc-async-cb.h
new file mode 100644 (file)
index 0000000..ce89f68
--- /dev/null
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <mutex>
+#include <condition_variable>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+       const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+class Lister : public grpc::ServerWriteReactor<::agl_shell_ipc::AppState> {
+public:
+       Lister(Shell *aglShell);
+       void OnDone() override;
+       void OnWriteDone(bool ok) override;
+       void NextWrite(void);
+private:
+       Shell *m_shell;
+};
+
+class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
+public:
+       GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+
+       grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::ActivateRequest* request,
+                       google::protobuf::Empty* /*response*/) override;
+
+       grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::DeactivateRequest* request,
+                       google::protobuf::Empty* /*response*/) override;
+
+       grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::SplitRequest* request,
+                       google::protobuf::Empty* /*response*/) override;
+
+       grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::FloatRequest* request,
+                       google::protobuf::Empty* /*response*/) override;
+
+       grpc::ServerWriteReactor< ::agl_shell_ipc::AppState>* AppStatusState(
+             ::grpc::CallbackServerContext* /*context*/,
+             const ::google::protobuf::Empty* /*request*/)  override;
+private:
+       Shell *m_aglShell;
+
+       std::mutex m_done_mutex;
+       std::condition_variable m_done_cv;
+       bool m_done = false;
+
+};
diff --git a/clients/grpc-async.cpp b/clients/grpc-async.cpp
new file mode 100644 (file)
index 0000000..72090fe
--- /dev/null
@@ -0,0 +1,121 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "agl_shell.grpc.pb.h"
+#include "grpc-async.h"
+
+void
+CallData::Proceed(void)
+{
+       switch (m_status) {
+       case CREATE:
+               // Make this instance progress to the PROCESS state.
+               m_status = PROCESS;
+               std::cout << "Creating Call data for new client connections: "
+                       << this << std::endl;
+
+               // As part of the initial CREATE state, we *request* that the
+               // system start processing AppStatusState requests.
+               //
+               // In this request, "this" acts are the tag uniquely
+               // identifying the request (so that different CallData
+               // instances can serve different requests concurrently), in
+               // this case the memory address of this CallData instance.
+               m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder,
+                                                m_cq, m_cq, (void *) this);
+               break;
+       case PROCESS:
+               // Spawn a new CallData instance to serve new clients while we
+               // process the one for this CallData. The instance will
+               // deallocate itself as part of its FINISH state.
+               CallData *cd = new CallData(m_service, m_cq);
+
+               // The actual processing.
+               m_status = PROCESSING;
+               m_repliesSent++;
+               break;
+       case PROCESSING:
+               if (m_repliesSent == MAX_REPLIES) {
+                       // And we are done! Let the gRPC runtime know we've
+                       // finished, using the memory address of this instance
+                       // as the uniquely identifying tag for the event.
+                       m_status = FINISH;
+                       m_responder.Finish(Status::OK, this);
+               } else {
+                       // The actual processing.
+                       m_status = PROCESSING;
+                       m_repliesSent++;
+               }
+               break;
+       case FINISH:
+               GPR_ASSERT(m_status == FINISH);
+               std::cout << "Completed RPC for: " << this << std::endl;
+               // Once in the FINISH state, deallocate ourselves (CallData).
+               delete this;
+               break;
+       default:
+               break;
+       }
+}
+
+GrpcServiceImpl::~GrpcServiceImpl()
+{
+       m_server->Shutdown();
+       // Always shutdown the completion queue after the server.
+       m_cq->Shutdown();
+}
+
+void
+GrpcServiceImpl::Run(void)
+{
+       std::string server_address(kDefaultGrpcServiceAddress);
+
+       grpc::ServerBuilder builder;
+       builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+
+       builder.RegisterService(&m_service);
+       m_cq = builder.AddCompletionQueue();
+
+       m_server = builder.BuildAndStart();
+       std::cout << "Server listening on " << server_address << std::endl;
+
+       // Proceed to the server's main loop.
+       HandleRpcs();
+}
+
+void
+GrpcServiceImpl::HandleRpcs(void)
+{
+       // Spawn a new CallData instance to serve new clients.
+       CallData *cd = new CallData(&m_service, m_cq.get());
+
+       // uniquely identifies a request.
+       void *tag;
+       bool ok;
+
+       // Block waiting to read the next event from the completion queue. The
+       // event is uniquely identified by its tag, which in this case is the
+       // memory address of a CallData instance.
+       //
+       // The return value of Next should always be checked. This return value
+       // tells us whether there is any kind of event or cq_ is shutting down.
+       while (true) {
+               std::cout << "Blocked on next waiting for events" << std::endl;
+               GPR_ASSERT(m_cq->Next(&tag, &ok));
+               GPR_ASSERT(ok);
+
+               std::cout << "Calling tag " << tag << " with Proceed()" << std::endl;
+               static_cast<CallData*>(tag)->Proceed();
+       }
+}
diff --git a/clients/grpc-async.h b/clients/grpc-async.h
new file mode 100644 (file)
index 0000000..40de52c
--- /dev/null
@@ -0,0 +1,79 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+       const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+class CallData {
+public:
+       // Take in the "service" instance (in this case representing an
+       // asynchronous server) and the completion queue "cq" used for
+       // asynchronous communication with the gRPC runtime.
+       CallData(Greeter::AsyncService* service, grpc::ServerCompletionQueue* cq)
+               : m_service(service), m_cq(cq), m_repliesSent(0), 
+               m_responder(&m_ctx), m_status(CREATE) { Proceed(); }
+       void Proceed();
+private:
+       // The means of communication with the gRPC runtime for an asynchronous
+       // server.
+       Greeter::AsyncService *m_service;
+       // The producer-consumer queue where for asynchronous server
+       // notifications.
+       grpc::ServerCompletionQueue *m_cq;
+       // Context for the rpc, allowing to tweak aspects of it such as the use
+       // of compression, authentication, as well as to send metadata back to
+       // the client.
+       grpc::ServerContext m_ctx;
+
+       // What we send back to the client.
+       ::agl_shell_ipc::AppState m_reply;
+
+       uint32_t m_repliesSent;
+       const uint32_t MAX_REPLIES = 5;
+
+       // The means to get back to the client.
+       grpc::ServerAsyncWriter<::agl_shell_ipc::AppState> m_responder;
+
+       // Let's implement a tiny state machine with the following states.
+       enum CallStatus {
+               CREATE,
+               PROCESS,
+               PROCESSING,
+               FINISH
+       };
+
+       // The current serving state.
+       CallStatus m_status;
+};
+
+
+class GrpcServiceImpl final {
+public:
+       GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+       ~GrpcServiceImpl();
+       void Run();
+
+       // This can be run in multiple threads if needed.
+       void HandleRpcs();
+
+private:
+       Shell *m_aglShell;
+
+       std::unique_ptr<grpc::ServerCompletionQueue> m_cq;
+       Greeter::AsyncService m_service;
+       std::unique_ptr<grpc::Server> m_server;
+};
diff --git a/clients/grpc-sync.cpp b/clients/grpc-sync.cpp
new file mode 100644 (file)
index 0000000..5aee817
--- /dev/null
@@ -0,0 +1,80 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <queue>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "agl_shell.grpc.pb.h"
+#include "grpc-sync.h"
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
+                            const ::agl_shell_ipc::ActivateRequest* request,
+                            google::protobuf::Empty* /*response*/)
+{
+       fprintf(stderr, "activating app %s on output %s\n",
+                       request->app_id().c_str(),
+                       request->output_name().c_str());
+
+       m_aglShell->ActivateApp(request->app_id(), request->output_name());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
+                              const ::agl_shell_ipc::DeactivateRequest* request,
+                              google::protobuf::Empty* /*response*/)
+{
+       m_aglShell->DeactivateApp(request->app_id());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
+                            const ::agl_shell_ipc::FloatRequest* request,
+                            google::protobuf::Empty* /* response */)
+{
+       m_aglShell->SetAppFloat(request->app_id());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
+           const ::agl_shell_ipc::SplitRequest* request,
+           google::protobuf::Empty* /*response*/)
+{
+       m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
+
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+       return reactor;
+}
+
+grpc::ServerUnaryReactor *
+GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext *context,
+           google::protobuf::Empty*,
+          ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer)
+{
+       (void) writer;
+       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
+       reactor->Finish(grpc::Status::OK);
+
+       return reactor;
+}
diff --git a/clients/grpc-sync.h b/clients/grpc-sync.h
new file mode 100644 (file)
index 0000000..caaee54
--- /dev/null
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <memory>
+
+#include <grpc/grpc.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+#include <grpcpp/health_check_service_interface.h>
+
+#include "shell.h"
+#include "agl_shell.grpc.pb.h"
+
+namespace {
+       const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
+}
+
+
+class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
+public:
+       GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {}
+
+       grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::ActivateRequest* request,
+                       google::protobuf::Empty* /*response*/);
+
+       grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::DeactivateRequest* request,
+                       google::protobuf::Empty* /*response*/);
+
+       grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::SplitRequest* request,
+                       google::protobuf::Empty* /*response*/);
+
+       grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
+                       const ::agl_shell_ipc::FloatRequest* request,
+                       google::protobuf::Empty* /*response*/);
+       grpc::ServerUnaryReactor *AppStatusState(grpc::CallbackServerContext *context,
+                       google::protobuf::Empty *empty,
+                       ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer);
+private:
+       Shell *m_aglShell;
+};
diff --git a/clients/grpc.h b/clients/grpc.h
deleted file mode 100644 (file)
index 3a3c864..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-#include <grpc/grpc.h>
-#include <grpcpp/grpcpp.h>
-#include <grpcpp/server.h>
-#include <grpcpp/server_builder.h>
-#include <grpcpp/server_context.h>
-
-#include <grpcpp/ext/proto_server_reflection_plugin.h>
-#include <grpcpp/health_check_service_interface.h>
-
-#include "agl_shell.grpc.pb.h"
-#include "agl-shell-client-protocol.h"
-
-namespace {
-       const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005";
-}
-
-
-class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService {
-
-       grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context,
-                       const ::agl_shell_ipc::ActivateRequest* request,
-                       google::protobuf::Empty* /*response*/);
-
-       grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context,
-                       const ::agl_shell_ipc::DeactivateRequest* request,
-                       google::protobuf::Empty* /*response*/);
-
-       grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context,
-                       const ::agl_shell_ipc::SplitRequest* request,
-                       google::protobuf::Empty* /*response*/);
-
-       grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context,
-                       const ::agl_shell_ipc::FloatRequest* request,
-                       google::protobuf::Empty* /*response*/);
-};
-
-
-class Shell {
-public:
-       std::shared_ptr<struct agl_shell> m_shell;
-       Shell(std::shared_ptr<struct agl_shell> shell) : m_shell(shell) { }
-       void ActivateApp(const std::string &app_id, const std::string &output_name);
-       void DeactivateApp(const std::string &app_id);
-       void SetAppSplit(const std::string &app_id, uint32_t orientation);
-       void SetAppFloat(const std::string &app_id);
-
-};
diff --git a/clients/log.h b/clients/log.h
new file mode 100644 (file)
index 0000000..127d8e0
--- /dev/null
@@ -0,0 +1,7 @@
+#pragma once
+
+#include <cstdio>
+
+#ifndef LOG
+#define LOG(fmt, ...) do { fprintf(stderr, "%s() " fmt, __func__, ##__VA_ARGS__); } while (0)
+#endif
similarity index 66%
rename from clients/grpc.cpp
rename to clients/main-grpc.cpp
index 503453d..5f35c85 100644 (file)
 #include <cstdio>
 #include <ctime>
 #include <algorithm>
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
 
-#include "grpc.h"
+#include "shell.h"
+#include "log.h"
+#include "main-grpc.h"
+#include "grpc-async-cb.h"
 
-struct shell_data {
-       struct wl_display *wl_display;
-       struct agl_shell *shell;
-       struct agl_shell_ext *shell_ext;
-       Shell *aglShell;
-
-       bool wait_for_bound;
-       bool wait_for_doas;
-
-       bool bound_ok;
-       bool doas_ok;
-
-       uint32_t version;
-       struct wl_list output_list;     /** window_output::link */
-};
-
-struct window_output {
-       struct shell_data *shell_data;
-       struct wl_output *output;
-       char *name;
-       struct wl_list link;    /** display::output_list */
-};
-
-static struct shell_data *sh = nullptr;
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context,
-                            const ::agl_shell_ipc::ActivateRequest* request,
-                            google::protobuf::Empty* /*response*/)
-{
-       fprintf(stderr, "activating app %s on output %s\n",
-                       request->app_id().c_str(),
-                       request->output_name().c_str());
-
-       sh->aglShell->ActivateApp(request->app_id(), request->output_name());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context,
-                              const ::agl_shell_ipc::DeactivateRequest* request,
-                              google::protobuf::Empty* /*response*/)
-{
-       sh->aglShell->DeactivateApp(request->app_id());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context,
-                            const ::agl_shell_ipc::FloatRequest* request,
-                            google::protobuf::Empty* /* response */)
-{
-       sh->aglShell->SetAppFloat(request->app_id());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-grpc::ServerUnaryReactor *
-GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context,
-           const ::agl_shell_ipc::SplitRequest* request,
-           google::protobuf::Empty* /*response*/)
-{
-       sh->aglShell->SetAppSplit(request->app_id(), request->tile_orientation());
-
-       grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
-       reactor->Finish(grpc::Status::OK);
-       return reactor;
-}
-
-void
-Shell::ActivateApp(const std::string &app_id, const std::string &output_name)
-{
-       struct window_output *woutput, *w_output;
-
-       woutput = nullptr;
-       w_output = nullptr;
-
-       wl_list_for_each(woutput, &sh->output_list, link) {
-               if (woutput->name && !strcmp(woutput->name, output_name.c_str())) {
-                       w_output = woutput;
-                       break;
-               }
-       }
-
-       // else, get the first one available
-       if (!w_output)
-               w_output = wl_container_of(sh->output_list.prev, w_output, link);
-
-       agl_shell_activate_app(this->m_shell.get(), app_id.c_str(), w_output->output);
-       wl_display_flush(sh->wl_display);
-}
-
-void
-Shell::DeactivateApp(const std::string &app_id)
-{
-       (void) app_id;
-}
-
-void
-Shell::SetAppFloat(const std::string &app_id)
-{
-       (void) app_id;
-}
-
-void
-Shell::SetAppSplit(const std::string &app_id, uint32_t orientation)
-{
-       (void) app_id;
-       (void) orientation;
-}
-
-static void
-start_grpc_server(void)
-{
-       // instantiante the grpc server
-       std::string server_address(kDefaultGrpcServiceAddress);
-       GrpcServiceImpl service;
-
-       grpc::EnableDefaultHealthCheckService(true);
-       grpc::reflection::InitProtoReflectionServerBuilderPlugin();
-
-       grpc::ServerBuilder builder;
-       builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
-       builder.RegisterService(&service);
-
-       std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
-       fprintf(stderr, "Server listening on %s\n", server_address.c_str());
-
-       server->Wait();
-}
+static int running = 1;
 
 static void
 agl_shell_bound_ok(void *data, struct agl_shell *agl_shell)
@@ -169,10 +39,24 @@ static void
 agl_shell_app_state(void *data, struct agl_shell *agl_shell,
                const char *app_id, uint32_t state)
 {
-       (void) data;
        (void) agl_shell;
-       (void) app_id;
-       (void) state;
+       struct shell_data *sh = static_cast<struct shell_data *>(data);
+       LOG("got app_state event app_id %s,  state %d\n", app_id, state);
+
+       if (sh->server_context_list.empty())
+               return;
+
+       ::agl_shell_ipc::AppState app;
+
+       sh->current_app_state.set_app_id(std::string(app_id));
+       sh->current_app_state.set_state(state);
+
+       auto start = sh->server_context_list.begin();
+       while (start != sh->server_context_list.end()) {
+               LOG("writing to lister %p\n", static_cast<void *>(start->second));
+               start->second->NextWrite();
+               start++;
+       }
 }
 
 static const struct agl_shell_listener shell_listener = {
@@ -294,7 +178,6 @@ destroy_output(struct window_output *w_output)
        free(w_output);
 }
 
-
 static void
 global_add(void *data, struct wl_registry *reg, uint32_t id,
                const char *interface, uint32_t version)
@@ -340,7 +223,8 @@ global_add_ext(void *data, struct wl_registry *reg, uint32_t id,
                        static_cast<struct agl_shell_ext *>(wl_registry_bind(reg, id,
                                        &agl_shell_ext_interface, std::min(static_cast<uint32_t>(1),
                                                                           version)));
-               agl_shell_ext_add_listener(sh->shell_ext, &shell_ext_listener, data);
+               agl_shell_ext_add_listener(sh->shell_ext,
+                                          &shell_ext_listener, data);
        }
 }
 
@@ -364,7 +248,7 @@ static const struct wl_registry_listener registry_listener = {
 };
 
 static void
-register_shell_ext(struct wl_display *wl_display)
+register_shell_ext(struct wl_display *wl_display, struct shell_data *sh)
 {
        struct wl_registry *registry;
 
@@ -377,7 +261,7 @@ register_shell_ext(struct wl_display *wl_display)
 }
 
 static void
-register_shell(struct wl_display *wl_display)
+register_shell(struct wl_display *wl_display, struct shell_data *sh)
 {
        struct wl_registry *registry;
 
@@ -391,7 +275,21 @@ register_shell(struct wl_display *wl_display)
        wl_registry_destroy(registry);
 }
 
-static int
+static void
+destroy_shell_data(struct shell_data *sh)
+{
+        struct window_output *w_output, *w_output_next;
+
+        wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
+                destroy_output(w_output);
+
+        wl_display_flush(sh->wl_display);
+        wl_display_disconnect(sh->wl_display);
+
+       delete sh;
+}
+
+static struct shell_data *
 start_agl_shell_client(void)
 {
        int ret = 0;
@@ -399,22 +297,25 @@ start_agl_shell_client(void)
 
        wl_display = wl_display_connect(NULL);
 
-       sh = new struct shell_data;
+       struct shell_data *sh = new struct shell_data;
+
        sh->wl_display = wl_display;
        sh->wait_for_doas = true;
        sh->wait_for_bound = true;
 
-       register_shell_ext(wl_display);
+       register_shell_ext(wl_display, sh);
 
        // check for agl_shell_ext
        if (!sh->shell_ext) {
                fprintf(stderr, "Failed to bind to agl_shell_ext interface\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        if (wl_list_empty(&sh->output_list)) {
                fprintf(stderr, "Failed get any outputs!\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        agl_shell_ext_doas_shell_client(sh->shell_ext);
@@ -426,11 +327,12 @@ start_agl_shell_client(void)
 
        if (!sh->doas_ok) {
                fprintf(stderr, "Failed to get doas_done event\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        // bind to agl-shell
-       register_shell(wl_display);
+       register_shell(wl_display, sh);
        while (ret != -1 && sh->wait_for_bound) {
                ret = wl_display_dispatch(sh->wl_display);
                if (sh->wait_for_bound)
@@ -440,34 +342,40 @@ start_agl_shell_client(void)
        // at this point, we can't do anything about it
        if (!sh->bound_ok) {
                fprintf(stderr, "Failed to get bound_ok event!\n");
-               return -1;
+               delete sh;
+               return nullptr;
        }
 
        fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n");
-       std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
-       sh->aglShell = new Shell(agl_shell);
 
-       return 0;
+       return sh; 
 }
 
 static void
-destroy_shell_data(void)
+start_grpc_server(Shell *aglShell)
 {
-        struct window_output *w_output, *w_output_next;
+       // instantiante the grpc server
+       std::string server_address(kDefaultGrpcServiceAddress);
+       GrpcServiceImpl service{aglShell};
 
-        wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link)
-                destroy_output(w_output);
+       grpc::EnableDefaultHealthCheckService(true);
+       grpc::reflection::InitProtoReflectionServerBuilderPlugin();
 
-        wl_display_flush(sh->wl_display);
-        wl_display_disconnect(sh->wl_display);
+       grpc::ServerBuilder builder;
+       builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+       builder.RegisterService(&service);
 
-       delete sh;
+       std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
+       LOG("Server listening on %s\n", server_address.c_str());
+
+       server->Wait();
 }
 
 int main(int argc, char **argv)
 {
        (void) argc;
        (void) argv;
+       Shell *aglShell;
        int ret = 0;
 
        // do not start right up, give shell client time to boot up
@@ -479,14 +387,22 @@ int main(int argc, char **argv)
 
        nanosleep(&ts, NULL);
 
-       ret = start_agl_shell_client();
-       if (ret) {
-               fprintf(stderr, "Failed to initialize agl-shell/agl-shell-ext\n");
+       struct shell_data *sh = start_agl_shell_client();
+       if (!sh) {
+               LOG("Failed to initialize agl-shell/agl-shell-ext\n");
                exit(EXIT_FAILURE);
        }
 
-       start_grpc_server();
+       std::shared_ptr<struct agl_shell> agl_shell{sh->shell, agl_shell_destroy};
+       aglShell = new Shell(agl_shell, sh);
+
+       std::thread thread(start_grpc_server, aglShell);
+
+       // serve wayland requests
+       while (running && ret != -1) {
+               ret = wl_display_dispatch(sh->wl_display);
+       }
 
-       destroy_shell_data();
+       destroy_shell_data(sh);
        return 0;
 }
diff --git a/clients/main-grpc.h b/clients/main-grpc.h
new file mode 100644 (file)
index 0000000..9b687e9
--- /dev/null
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <cstdio>
+#include <algorithm>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <wayland-client.h>
+
+#include "agl_shell.grpc.pb.h"
+
+// forward declaration created in grpc-async-cb
+class Lister;
+
+struct shell_data {
+       struct wl_display *wl_display;
+       struct agl_shell *shell;
+       struct agl_shell_ext *shell_ext;
+
+       bool wait_for_bound;
+       bool wait_for_doas;
+
+       bool bound_ok;
+       bool doas_ok;
+
+       uint32_t version;
+       struct wl_list output_list;     /** window_output::link */
+
+       ::agl_shell_ipc::AppState current_app_state;
+       std::list<std::pair<grpc::CallbackServerContext*, Lister *> > server_context_list;
+};
+
+struct window_output {
+       struct shell_data *shell_data;
+       struct wl_output *output;
+       char *name;
+       struct wl_list link;    /** display::output_list */
+};
index ff27a60..648c95a 100644 (file)
@@ -46,7 +46,9 @@ clients = [
 {
     'basename': 'agl-shell-grpc-server',
     'sources': [
-      'grpc.cpp',
+      'main-grpc.cpp',
+      'grpc-async-cb.cpp',
+      'shell.cpp',
       generated_protoc_sources,
       generated_grpc_sources,
       agl_shell_client_protocol_h,
diff --git a/clients/shell.cpp b/clients/shell.cpp
new file mode 100644 (file)
index 0000000..343b36e
--- /dev/null
@@ -0,0 +1,53 @@
+#include <cstdio>
+#include <ctime>
+#include <algorithm>
+#include <cstring>
+#include <string>
+#include <queue>
+
+#include "main-grpc.h"
+#include "shell.h"
+
+void
+Shell::ActivateApp(const std::string &app_id, const std::string &output_name)
+{
+       struct window_output *woutput, *w_output;
+       struct agl_shell *shell = this->m_shell.get();
+
+       woutput = nullptr;
+       w_output = nullptr;
+
+       wl_list_for_each(woutput, &m_shell_data->output_list, link) {
+               if (woutput->name && !strcmp(woutput->name, output_name.c_str())) {
+                       w_output = woutput;
+                       break;
+               }
+       }
+
+       // else, get the first one available
+       if (!w_output)
+               w_output = wl_container_of(m_shell_data->output_list.prev,
+                                          w_output, link);
+
+       agl_shell_activate_app(shell, app_id.c_str(), w_output->output);
+       wl_display_flush(m_shell_data->wl_display);
+}
+
+void
+Shell::DeactivateApp(const std::string &app_id)
+{
+       (void) app_id;
+}
+
+void
+Shell::SetAppFloat(const std::string &app_id)
+{
+       (void) app_id;
+}
+
+void
+Shell::SetAppSplit(const std::string &app_id, uint32_t orientation)
+{
+       (void) app_id;
+       (void) orientation;
+}
diff --git a/clients/shell.h b/clients/shell.h
new file mode 100644 (file)
index 0000000..1cdbd1d
--- /dev/null
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <memory>
+
+#include "agl-shell-client-protocol.h"
+
+#include "main-grpc.h"
+
+class Shell {
+public:
+       std::shared_ptr<struct agl_shell> m_shell;
+       struct shell_data *m_shell_data;
+
+       Shell(std::shared_ptr<struct agl_shell> shell,
+             struct shell_data *sh_data) :
+               m_shell(shell), m_shell_data(sh_data) { }
+       void ActivateApp(const std::string &app_id, const std::string &output_name);
+       void DeactivateApp(const std::string &app_id);
+       void SetAppSplit(const std::string &app_id, uint32_t orientation);
+       void SetAppFloat(const std::string &app_id);
+};
index 721fac2..414162b 100644 (file)
@@ -3,10 +3,11 @@ import "google/protobuf/empty.proto";
 package agl_shell_ipc;
 
 service AglShellManagerService {
-       rpc ActivateApp(ActivateRequest)        returns (google.protobuf.Empty) {}
-       rpc DeactivateApp(DeactivateRequest)    returns (google.protobuf.Empty) {}
-       rpc SetAppSplit(SplitRequest)           returns (google.protobuf.Empty) {}
-       rpc SetAppFloat(FloatRequest)           returns (google.protobuf.Empty) {}
+       rpc ActivateApp(ActivateRequest)                        returns (google.protobuf.Empty) {}
+       rpc DeactivateApp(DeactivateRequest)            returns (google.protobuf.Empty) {}
+       rpc SetAppSplit(SplitRequest)                   returns (google.protobuf.Empty) {}
+       rpc SetAppFloat(FloatRequest)                   returns (google.protobuf.Empty) {}
+       rpc AppStatusState(google.protobuf.Empty)       returns (stream AppState) {}
 }
 
 message ActivateRequest {
@@ -27,3 +28,7 @@ message FloatRequest {
        string app_id = 1;
 }
 
+message AppState {
+       int32 state = 1;
+       string app_id = 2;
+}
index 889c3f3..891c093 100644 (file)
@@ -86,8 +86,6 @@ struct ivi_compositor {
        struct {
                struct wl_client *client;
                struct wl_resource *resource;
-
-               struct wl_client *client_ext;
                struct wl_resource *resource_ext;
                bool ready;
                enum agl_shell_bound_status status;
index f8d2e32..d583fe5 100644 (file)
@@ -1145,12 +1145,16 @@ shell_send_app_state(struct ivi_compositor *ivi, const char *app_id,
        if (app_id && wl_resource_get_version(ivi->shell_client.resource) >=
            AGL_SHELL_APP_STATE_SINCE_VERSION) {
 
+               weston_log("%s() should sent app_state\n", __func__);
                agl_shell_send_app_state(ivi->shell_client.resource,
                                         app_id, state);
 
-               if (ivi->shell_client_ext.resource)
-                       agl_shell_send_app_state(ivi->shell_client_ext.resource,
+               if (ivi->shell_client.resource_ext) {
+                       weston_log("%s() 2. should sent app_state %p\n", 
+                                       __func__, ivi->shell_client.resource_ext);
+                       agl_shell_send_app_state(ivi->shell_client.resource_ext,
                                                 app_id, state);
+               }
        }
 }
 
@@ -1634,13 +1638,13 @@ bind_agl_shell(struct wl_client *client,
 
                        wl_resource_set_implementation(resource, &agl_shell_implementation,
                                                       ivi, NULL);
-                       ivi->shell_client_ext.resource = resource;
+                       ivi->shell_client.resource_ext = resource;
 
                        if (ivi->shell_client.status == BOUND_OK &&
                            wl_resource_get_version(resource) >= AGL_SHELL_BOUND_OK_SINCE_VERSION) {
-                               weston_log("Sent agl_shell_send_bound_ok to client ext\n");
                                ivi->shell_client_ext.status = BOUND_OK;
-                               agl_shell_send_bound_ok(ivi->shell_client_ext.resource);
+                               agl_shell_send_bound_ok(ivi->shell_client.resource_ext);
+                               weston_log("Sent agl_shell_send_bound_ok to client ext %p\n", ivi->shell_client.resource_ext);
                        }
 
                        return;