From: Marius Vlad Date: Tue, 18 Oct 2022 17:22:47 +0000 (+0300) Subject: Add more grpc - Asyncstuff X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=commitdiff_plain;h=33aea95f02b025ecaf5917b10975119c07c353b3;p=src%2Fagl-compositor.git Add more grpc - Asyncstuff Switch to a more better structure protocol: Add support for sending out app_state events over gRPC Signed-off-by: Marius Vlad Change-Id: I2765d53a2123be0d52225d92c964d39c63ec4902 --- diff --git a/clients/grpc-async-cb.cpp b/clients/grpc-async-cb.cpp new file mode 100644 index 0000000..71cd57e --- /dev/null +++ b/clients/grpc-async-cb.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "log.h" +#include "agl_shell.grpc.pb.h" +#include "grpc-async-cb.h" + +Lister::Lister(Shell *shell) : m_shell(shell) +{ + // don't call NextWrite() just yet we do it explicitly when getting + // the events from the compositor +} + +void +Lister::OnDone() +{ + delete this; +} + +void Lister::OnWriteDone(bool ok) +{ + LOG("ok %d\n", ok); + if (ok) { + // normally we should finish here, but we don't do that to keep + // the channel open + //Finish(grpc::Status::OK); + } +} + +void +Lister::NextWrite(void) +{ + // we're going to have a Lister instance per client so we're safe here + StartWrite(&m_shell->m_shell_data->current_app_state); +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + LOG("activating app %s on output %s\n", request->app_id().c_str(), + request->output_name().c_str()); + + m_aglShell->ActivateApp(request->app_id(), request->output_name()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->DeactivateApp(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /* response */) +{ + m_aglShell->SetAppFloat(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerWriteReactor<::agl_shell_ipc::AppState>* +GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext* context, + const google::protobuf::Empty*) +{ + + Lister *n = new Lister(m_aglShell); + + m_aglShell->m_shell_data->server_context_list.push_back(std::pair(context, n)); + LOG("added lister %p\n", static_cast(n)); + + // just return a Lister to keep the channel open + return n; +} diff --git a/clients/grpc-async-cb.h b/clients/grpc-async-cb.h new file mode 100644 index 0000000..ce89f68 --- /dev/null +++ b/clients/grpc-async-cb.h @@ -0,0 +1,64 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + +class Lister : public grpc::ServerWriteReactor<::agl_shell_ipc::AppState> { +public: + Lister(Shell *aglShell); + void OnDone() override; + void OnWriteDone(bool ok) override; + void NextWrite(void); +private: + Shell *m_shell; +}; + +class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + + grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /*response*/) override; + + grpc::ServerWriteReactor< ::agl_shell_ipc::AppState>* AppStatusState( + ::grpc::CallbackServerContext* /*context*/, + const ::google::protobuf::Empty* /*request*/) override; +private: + Shell *m_aglShell; + + std::mutex m_done_mutex; + std::condition_variable m_done_cv; + bool m_done = false; + +}; diff --git a/clients/grpc-async.cpp b/clients/grpc-async.cpp new file mode 100644 index 0000000..72090fe --- /dev/null +++ b/clients/grpc-async.cpp @@ -0,0 +1,121 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "agl_shell.grpc.pb.h" +#include "grpc-async.h" + +void +CallData::Proceed(void) +{ + switch (m_status) { + case CREATE: + // Make this instance progress to the PROCESS state. + m_status = PROCESS; + std::cout << "Creating Call data for new client connections: " + << this << std::endl; + + // As part of the initial CREATE state, we *request* that the + // system start processing AppStatusState requests. + // + // In this request, "this" acts are the tag uniquely + // identifying the request (so that different CallData + // instances can serve different requests concurrently), in + // this case the memory address of this CallData instance. + m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder, + m_cq, m_cq, (void *) this); + break; + case PROCESS: + // Spawn a new CallData instance to serve new clients while we + // process the one for this CallData. The instance will + // deallocate itself as part of its FINISH state. + CallData *cd = new CallData(m_service, m_cq); + + // The actual processing. + m_status = PROCESSING; + m_repliesSent++; + break; + case PROCESSING: + if (m_repliesSent == MAX_REPLIES) { + // And we are done! Let the gRPC runtime know we've + // finished, using the memory address of this instance + // as the uniquely identifying tag for the event. + m_status = FINISH; + m_responder.Finish(Status::OK, this); + } else { + // The actual processing. + m_status = PROCESSING; + m_repliesSent++; + } + break; + case FINISH: + GPR_ASSERT(m_status == FINISH); + std::cout << "Completed RPC for: " << this << std::endl; + // Once in the FINISH state, deallocate ourselves (CallData). + delete this; + break; + default: + break; + } +} + +GrpcServiceImpl::~GrpcServiceImpl() +{ + m_server->Shutdown(); + // Always shutdown the completion queue after the server. + m_cq->Shutdown(); +} + +void +GrpcServiceImpl::Run(void) +{ + std::string server_address(kDefaultGrpcServiceAddress); + + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + + builder.RegisterService(&m_service); + m_cq = builder.AddCompletionQueue(); + + m_server = builder.BuildAndStart(); + std::cout << "Server listening on " << server_address << std::endl; + + // Proceed to the server's main loop. + HandleRpcs(); +} + +void +GrpcServiceImpl::HandleRpcs(void) +{ + // Spawn a new CallData instance to serve new clients. + CallData *cd = new CallData(&m_service, m_cq.get()); + + // uniquely identifies a request. + void *tag; + bool ok; + + // Block waiting to read the next event from the completion queue. The + // event is uniquely identified by its tag, which in this case is the + // memory address of a CallData instance. + // + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or cq_ is shutting down. + while (true) { + std::cout << "Blocked on next waiting for events" << std::endl; + GPR_ASSERT(m_cq->Next(&tag, &ok)); + GPR_ASSERT(ok); + + std::cout << "Calling tag " << tag << " with Proceed()" << std::endl; + static_cast(tag)->Proceed(); + } +} diff --git a/clients/grpc-async.h b/clients/grpc-async.h new file mode 100644 index 0000000..40de52c --- /dev/null +++ b/clients/grpc-async.h @@ -0,0 +1,79 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + +class CallData { +public: + // Take in the "service" instance (in this case representing an + // asynchronous server) and the completion queue "cq" used for + // asynchronous communication with the gRPC runtime. + CallData(Greeter::AsyncService* service, grpc::ServerCompletionQueue* cq) + : m_service(service), m_cq(cq), m_repliesSent(0), + m_responder(&m_ctx), m_status(CREATE) { Proceed(); } + void Proceed(); +private: + // The means of communication with the gRPC runtime for an asynchronous + // server. + Greeter::AsyncService *m_service; + // The producer-consumer queue where for asynchronous server + // notifications. + grpc::ServerCompletionQueue *m_cq; + // Context for the rpc, allowing to tweak aspects of it such as the use + // of compression, authentication, as well as to send metadata back to + // the client. + grpc::ServerContext m_ctx; + + // What we send back to the client. + ::agl_shell_ipc::AppState m_reply; + + uint32_t m_repliesSent; + const uint32_t MAX_REPLIES = 5; + + // The means to get back to the client. + grpc::ServerAsyncWriter<::agl_shell_ipc::AppState> m_responder; + + // Let's implement a tiny state machine with the following states. + enum CallStatus { + CREATE, + PROCESS, + PROCESSING, + FINISH + }; + + // The current serving state. + CallStatus m_status; +}; + + +class GrpcServiceImpl final { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + ~GrpcServiceImpl(); + void Run(); + + // This can be run in multiple threads if needed. + void HandleRpcs(); + +private: + Shell *m_aglShell; + + std::unique_ptr m_cq; + Greeter::AsyncService m_service; + std::unique_ptr m_server; +}; diff --git a/clients/grpc-sync.cpp b/clients/grpc-sync.cpp new file mode 100644 index 0000000..5aee817 --- /dev/null +++ b/clients/grpc-sync.cpp @@ -0,0 +1,80 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "agl_shell.grpc.pb.h" +#include "grpc-sync.h" + +grpc::ServerUnaryReactor * +GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + fprintf(stderr, "activating app %s on output %s\n", + request->app_id().c_str(), + request->output_name().c_str()); + + m_aglShell->ActivateApp(request->app_id(), request->output_name()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->DeactivateApp(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /* response */) +{ + m_aglShell->SetAppFloat(request->app_id()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/) +{ + m_aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); + + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; +} + +grpc::ServerUnaryReactor * +GrpcServiceImpl::AppStatusState(grpc::CallbackServerContext *context, + google::protobuf::Empty*, + ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer) +{ + (void) writer; + grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + + return reactor; +} diff --git a/clients/grpc-sync.h b/clients/grpc-sync.h new file mode 100644 index 0000000..caaee54 --- /dev/null +++ b/clients/grpc-sync.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "shell.h" +#include "agl_shell.grpc.pb.h" + +namespace { + const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; +} + + +class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { +public: + GrpcServiceImpl(Shell *aglShell) : m_aglShell(aglShell) {} + + grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::ActivateRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::DeactivateRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::SplitRequest* request, + google::protobuf::Empty* /*response*/); + + grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, + const ::agl_shell_ipc::FloatRequest* request, + google::protobuf::Empty* /*response*/); + grpc::ServerUnaryReactor *AppStatusState(grpc::CallbackServerContext *context, + google::protobuf::Empty *empty, + ::grpc::ServerWriter<::agl_shell_ipc::AppState>* writer); +private: + Shell *m_aglShell; +}; diff --git a/clients/grpc.h b/clients/grpc.h deleted file mode 100644 index 3a3c864..0000000 --- a/clients/grpc.h +++ /dev/null @@ -1,47 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include - -#include "agl_shell.grpc.pb.h" -#include "agl-shell-client-protocol.h" - -namespace { - const char kDefaultGrpcServiceAddress[] = "127.0.0.1:14005"; -} - - -class GrpcServiceImpl final : public agl_shell_ipc::AglShellManagerService::CallbackService { - - grpc::ServerUnaryReactor *ActivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::ActivateRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *DeactivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::DeactivateRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *SetAppSplit(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::SplitRequest* request, - google::protobuf::Empty* /*response*/); - - grpc::ServerUnaryReactor *SetAppFloat(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::FloatRequest* request, - google::protobuf::Empty* /*response*/); -}; - - -class Shell { -public: - std::shared_ptr m_shell; - Shell(std::shared_ptr shell) : m_shell(shell) { } - void ActivateApp(const std::string &app_id, const std::string &output_name); - void DeactivateApp(const std::string &app_id); - void SetAppSplit(const std::string &app_id, uint32_t orientation); - void SetAppFloat(const std::string &app_id); - -}; diff --git a/clients/log.h b/clients/log.h new file mode 100644 index 0000000..127d8e0 --- /dev/null +++ b/clients/log.h @@ -0,0 +1,7 @@ +#pragma once + +#include + +#ifndef LOG +#define LOG(fmt, ...) do { fprintf(stderr, "%s() " fmt, __func__, ##__VA_ARGS__); } while (0) +#endif diff --git a/clients/grpc.cpp b/clients/main-grpc.cpp similarity index 66% rename from clients/grpc.cpp rename to clients/main-grpc.cpp index 503453d..5f35c85 100644 --- a/clients/grpc.cpp +++ b/clients/main-grpc.cpp @@ -1,147 +1,17 @@ #include #include #include +#include +#include +#include +#include -#include "grpc.h" +#include "shell.h" +#include "log.h" +#include "main-grpc.h" +#include "grpc-async-cb.h" -struct shell_data { - struct wl_display *wl_display; - struct agl_shell *shell; - struct agl_shell_ext *shell_ext; - Shell *aglShell; - - bool wait_for_bound; - bool wait_for_doas; - - bool bound_ok; - bool doas_ok; - - uint32_t version; - struct wl_list output_list; /** window_output::link */ -}; - -struct window_output { - struct shell_data *shell_data; - struct wl_output *output; - char *name; - struct wl_list link; /** display::output_list */ -}; - -static struct shell_data *sh = nullptr; - -grpc::ServerUnaryReactor * -GrpcServiceImpl::ActivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::ActivateRequest* request, - google::protobuf::Empty* /*response*/) -{ - fprintf(stderr, "activating app %s on output %s\n", - request->app_id().c_str(), - request->output_name().c_str()); - - sh->aglShell->ActivateApp(request->app_id(), request->output_name()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::DeactivateApp(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::DeactivateRequest* request, - google::protobuf::Empty* /*response*/) -{ - sh->aglShell->DeactivateApp(request->app_id()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::SetAppFloat(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::FloatRequest* request, - google::protobuf::Empty* /* response */) -{ - sh->aglShell->SetAppFloat(request->app_id()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -grpc::ServerUnaryReactor * -GrpcServiceImpl::SetAppSplit(grpc::CallbackServerContext *context, - const ::agl_shell_ipc::SplitRequest* request, - google::protobuf::Empty* /*response*/) -{ - sh->aglShell->SetAppSplit(request->app_id(), request->tile_orientation()); - - grpc::ServerUnaryReactor* reactor = context->DefaultReactor(); - reactor->Finish(grpc::Status::OK); - return reactor; -} - -void -Shell::ActivateApp(const std::string &app_id, const std::string &output_name) -{ - struct window_output *woutput, *w_output; - - woutput = nullptr; - w_output = nullptr; - - wl_list_for_each(woutput, &sh->output_list, link) { - if (woutput->name && !strcmp(woutput->name, output_name.c_str())) { - w_output = woutput; - break; - } - } - - // else, get the first one available - if (!w_output) - w_output = wl_container_of(sh->output_list.prev, w_output, link); - - agl_shell_activate_app(this->m_shell.get(), app_id.c_str(), w_output->output); - wl_display_flush(sh->wl_display); -} - -void -Shell::DeactivateApp(const std::string &app_id) -{ - (void) app_id; -} - -void -Shell::SetAppFloat(const std::string &app_id) -{ - (void) app_id; -} - -void -Shell::SetAppSplit(const std::string &app_id, uint32_t orientation) -{ - (void) app_id; - (void) orientation; -} - -static void -start_grpc_server(void) -{ - // instantiante the grpc server - std::string server_address(kDefaultGrpcServiceAddress); - GrpcServiceImpl service; - - grpc::EnableDefaultHealthCheckService(true); - grpc::reflection::InitProtoReflectionServerBuilderPlugin(); - - grpc::ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr server(builder.BuildAndStart()); - fprintf(stderr, "Server listening on %s\n", server_address.c_str()); - - server->Wait(); -} +static int running = 1; static void agl_shell_bound_ok(void *data, struct agl_shell *agl_shell) @@ -169,10 +39,24 @@ static void agl_shell_app_state(void *data, struct agl_shell *agl_shell, const char *app_id, uint32_t state) { - (void) data; (void) agl_shell; - (void) app_id; - (void) state; + struct shell_data *sh = static_cast(data); + LOG("got app_state event app_id %s, state %d\n", app_id, state); + + if (sh->server_context_list.empty()) + return; + + ::agl_shell_ipc::AppState app; + + sh->current_app_state.set_app_id(std::string(app_id)); + sh->current_app_state.set_state(state); + + auto start = sh->server_context_list.begin(); + while (start != sh->server_context_list.end()) { + LOG("writing to lister %p\n", static_cast(start->second)); + start->second->NextWrite(); + start++; + } } static const struct agl_shell_listener shell_listener = { @@ -294,7 +178,6 @@ destroy_output(struct window_output *w_output) free(w_output); } - static void global_add(void *data, struct wl_registry *reg, uint32_t id, const char *interface, uint32_t version) @@ -340,7 +223,8 @@ global_add_ext(void *data, struct wl_registry *reg, uint32_t id, static_cast(wl_registry_bind(reg, id, &agl_shell_ext_interface, std::min(static_cast(1), version))); - agl_shell_ext_add_listener(sh->shell_ext, &shell_ext_listener, data); + agl_shell_ext_add_listener(sh->shell_ext, + &shell_ext_listener, data); } } @@ -364,7 +248,7 @@ static const struct wl_registry_listener registry_listener = { }; static void -register_shell_ext(struct wl_display *wl_display) +register_shell_ext(struct wl_display *wl_display, struct shell_data *sh) { struct wl_registry *registry; @@ -377,7 +261,7 @@ register_shell_ext(struct wl_display *wl_display) } static void -register_shell(struct wl_display *wl_display) +register_shell(struct wl_display *wl_display, struct shell_data *sh) { struct wl_registry *registry; @@ -391,7 +275,21 @@ register_shell(struct wl_display *wl_display) wl_registry_destroy(registry); } -static int +static void +destroy_shell_data(struct shell_data *sh) +{ + struct window_output *w_output, *w_output_next; + + wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link) + destroy_output(w_output); + + wl_display_flush(sh->wl_display); + wl_display_disconnect(sh->wl_display); + + delete sh; +} + +static struct shell_data * start_agl_shell_client(void) { int ret = 0; @@ -399,22 +297,25 @@ start_agl_shell_client(void) wl_display = wl_display_connect(NULL); - sh = new struct shell_data; + struct shell_data *sh = new struct shell_data; + sh->wl_display = wl_display; sh->wait_for_doas = true; sh->wait_for_bound = true; - register_shell_ext(wl_display); + register_shell_ext(wl_display, sh); // check for agl_shell_ext if (!sh->shell_ext) { fprintf(stderr, "Failed to bind to agl_shell_ext interface\n"); - return -1; + delete sh; + return nullptr; } if (wl_list_empty(&sh->output_list)) { fprintf(stderr, "Failed get any outputs!\n"); - return -1; + delete sh; + return nullptr; } agl_shell_ext_doas_shell_client(sh->shell_ext); @@ -426,11 +327,12 @@ start_agl_shell_client(void) if (!sh->doas_ok) { fprintf(stderr, "Failed to get doas_done event\n"); - return -1; + delete sh; + return nullptr; } // bind to agl-shell - register_shell(wl_display); + register_shell(wl_display, sh); while (ret != -1 && sh->wait_for_bound) { ret = wl_display_dispatch(sh->wl_display); if (sh->wait_for_bound) @@ -440,34 +342,40 @@ start_agl_shell_client(void) // at this point, we can't do anything about it if (!sh->bound_ok) { fprintf(stderr, "Failed to get bound_ok event!\n"); - return -1; + delete sh; + return nullptr; } fprintf(stderr, "agl_shell/agl_shell_ext interface OK\n"); - std::shared_ptr agl_shell{sh->shell, agl_shell_destroy}; - sh->aglShell = new Shell(agl_shell); - return 0; + return sh; } static void -destroy_shell_data(void) +start_grpc_server(Shell *aglShell) { - struct window_output *w_output, *w_output_next; + // instantiante the grpc server + std::string server_address(kDefaultGrpcServiceAddress); + GrpcServiceImpl service{aglShell}; - wl_list_for_each_safe(w_output, w_output_next, &sh->output_list, link) - destroy_output(w_output); + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); - wl_display_flush(sh->wl_display); - wl_display_disconnect(sh->wl_display); + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); - delete sh; + std::unique_ptr server(builder.BuildAndStart()); + LOG("Server listening on %s\n", server_address.c_str()); + + server->Wait(); } int main(int argc, char **argv) { (void) argc; (void) argv; + Shell *aglShell; int ret = 0; // do not start right up, give shell client time to boot up @@ -479,14 +387,22 @@ int main(int argc, char **argv) nanosleep(&ts, NULL); - ret = start_agl_shell_client(); - if (ret) { - fprintf(stderr, "Failed to initialize agl-shell/agl-shell-ext\n"); + struct shell_data *sh = start_agl_shell_client(); + if (!sh) { + LOG("Failed to initialize agl-shell/agl-shell-ext\n"); exit(EXIT_FAILURE); } - start_grpc_server(); + std::shared_ptr agl_shell{sh->shell, agl_shell_destroy}; + aglShell = new Shell(agl_shell, sh); + + std::thread thread(start_grpc_server, aglShell); + + // serve wayland requests + while (running && ret != -1) { + ret = wl_display_dispatch(sh->wl_display); + } - destroy_shell_data(); + destroy_shell_data(sh); return 0; } diff --git a/clients/main-grpc.h b/clients/main-grpc.h new file mode 100644 index 0000000..9b687e9 --- /dev/null +++ b/clients/main-grpc.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "agl_shell.grpc.pb.h" + +// forward declaration created in grpc-async-cb +class Lister; + +struct shell_data { + struct wl_display *wl_display; + struct agl_shell *shell; + struct agl_shell_ext *shell_ext; + + bool wait_for_bound; + bool wait_for_doas; + + bool bound_ok; + bool doas_ok; + + uint32_t version; + struct wl_list output_list; /** window_output::link */ + + ::agl_shell_ipc::AppState current_app_state; + std::list > server_context_list; +}; + +struct window_output { + struct shell_data *shell_data; + struct wl_output *output; + char *name; + struct wl_list link; /** display::output_list */ +}; diff --git a/clients/meson.build b/clients/meson.build index ff27a60..648c95a 100644 --- a/clients/meson.build +++ b/clients/meson.build @@ -46,7 +46,9 @@ clients = [ { 'basename': 'agl-shell-grpc-server', 'sources': [ - 'grpc.cpp', + 'main-grpc.cpp', + 'grpc-async-cb.cpp', + 'shell.cpp', generated_protoc_sources, generated_grpc_sources, agl_shell_client_protocol_h, diff --git a/clients/shell.cpp b/clients/shell.cpp new file mode 100644 index 0000000..343b36e --- /dev/null +++ b/clients/shell.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include +#include + +#include "main-grpc.h" +#include "shell.h" + +void +Shell::ActivateApp(const std::string &app_id, const std::string &output_name) +{ + struct window_output *woutput, *w_output; + struct agl_shell *shell = this->m_shell.get(); + + woutput = nullptr; + w_output = nullptr; + + wl_list_for_each(woutput, &m_shell_data->output_list, link) { + if (woutput->name && !strcmp(woutput->name, output_name.c_str())) { + w_output = woutput; + break; + } + } + + // else, get the first one available + if (!w_output) + w_output = wl_container_of(m_shell_data->output_list.prev, + w_output, link); + + agl_shell_activate_app(shell, app_id.c_str(), w_output->output); + wl_display_flush(m_shell_data->wl_display); +} + +void +Shell::DeactivateApp(const std::string &app_id) +{ + (void) app_id; +} + +void +Shell::SetAppFloat(const std::string &app_id) +{ + (void) app_id; +} + +void +Shell::SetAppSplit(const std::string &app_id, uint32_t orientation) +{ + (void) app_id; + (void) orientation; +} diff --git a/clients/shell.h b/clients/shell.h new file mode 100644 index 0000000..1cdbd1d --- /dev/null +++ b/clients/shell.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include "agl-shell-client-protocol.h" + +#include "main-grpc.h" + +class Shell { +public: + std::shared_ptr m_shell; + struct shell_data *m_shell_data; + + Shell(std::shared_ptr shell, + struct shell_data *sh_data) : + m_shell(shell), m_shell_data(sh_data) { } + void ActivateApp(const std::string &app_id, const std::string &output_name); + void DeactivateApp(const std::string &app_id); + void SetAppSplit(const std::string &app_id, uint32_t orientation); + void SetAppFloat(const std::string &app_id); +}; diff --git a/protocol/agl_shell.proto b/protocol/agl_shell.proto index 721fac2..414162b 100644 --- a/protocol/agl_shell.proto +++ b/protocol/agl_shell.proto @@ -3,10 +3,11 @@ import "google/protobuf/empty.proto"; package agl_shell_ipc; service AglShellManagerService { - rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {} - rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {} - rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {} - rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {} + rpc ActivateApp(ActivateRequest) returns (google.protobuf.Empty) {} + rpc DeactivateApp(DeactivateRequest) returns (google.protobuf.Empty) {} + rpc SetAppSplit(SplitRequest) returns (google.protobuf.Empty) {} + rpc SetAppFloat(FloatRequest) returns (google.protobuf.Empty) {} + rpc AppStatusState(google.protobuf.Empty) returns (stream AppState) {} } message ActivateRequest { @@ -27,3 +28,7 @@ message FloatRequest { string app_id = 1; } +message AppState { + int32 state = 1; + string app_id = 2; +} diff --git a/src/ivi-compositor.h b/src/ivi-compositor.h index 889c3f3..891c093 100644 --- a/src/ivi-compositor.h +++ b/src/ivi-compositor.h @@ -86,8 +86,6 @@ struct ivi_compositor { struct { struct wl_client *client; struct wl_resource *resource; - - struct wl_client *client_ext; struct wl_resource *resource_ext; bool ready; enum agl_shell_bound_status status; diff --git a/src/shell.c b/src/shell.c index f8d2e32..d583fe5 100644 --- a/src/shell.c +++ b/src/shell.c @@ -1145,12 +1145,16 @@ shell_send_app_state(struct ivi_compositor *ivi, const char *app_id, if (app_id && wl_resource_get_version(ivi->shell_client.resource) >= AGL_SHELL_APP_STATE_SINCE_VERSION) { + weston_log("%s() should sent app_state\n", __func__); agl_shell_send_app_state(ivi->shell_client.resource, app_id, state); - if (ivi->shell_client_ext.resource) - agl_shell_send_app_state(ivi->shell_client_ext.resource, + if (ivi->shell_client.resource_ext) { + weston_log("%s() 2. should sent app_state %p\n", + __func__, ivi->shell_client.resource_ext); + agl_shell_send_app_state(ivi->shell_client.resource_ext, app_id, state); + } } } @@ -1634,13 +1638,13 @@ bind_agl_shell(struct wl_client *client, wl_resource_set_implementation(resource, &agl_shell_implementation, ivi, NULL); - ivi->shell_client_ext.resource = resource; + ivi->shell_client.resource_ext = resource; if (ivi->shell_client.status == BOUND_OK && wl_resource_get_version(resource) >= AGL_SHELL_BOUND_OK_SINCE_VERSION) { - weston_log("Sent agl_shell_send_bound_ok to client ext\n"); ivi->shell_client_ext.status = BOUND_OK; - agl_shell_send_bound_ok(ivi->shell_client_ext.resource); + agl_shell_send_bound_ok(ivi->shell_client.resource_ext); + weston_log("Sent agl_shell_send_bound_ok to client ext %p\n", ivi->shell_client.resource_ext); } return;