Add more grpc - Asyncstuff
[src/agl-compositor.git] / clients / grpc-async.cpp
1 #include <cstdio>
2 #include <ctime>
3 #include <algorithm>
4 #include <queue>
5
6 #include <grpc/grpc.h>
7 #include <grpcpp/grpcpp.h>
8 #include <grpcpp/server.h>
9 #include <grpcpp/server_builder.h>
10 #include <grpcpp/server_context.h>
11
12 #include <grpcpp/ext/proto_server_reflection_plugin.h>
13 #include <grpcpp/health_check_service_interface.h>
14
15 #include "agl_shell.grpc.pb.h"
16 #include "grpc-async.h"
17
18 void
19 CallData::Proceed(void)
20 {
21         switch (m_status) {
22         case CREATE:
23                 // Make this instance progress to the PROCESS state.
24                 m_status = PROCESS;
25                 std::cout << "Creating Call data for new client connections: "
26                         << this << std::endl;
27
28                 // As part of the initial CREATE state, we *request* that the
29                 // system start processing AppStatusState requests.
30                 //
31                 // In this request, "this" acts are the tag uniquely
32                 // identifying the request (so that different CallData
33                 // instances can serve different requests concurrently), in
34                 // this case the memory address of this CallData instance.
35                 m_service->RequestAppStatusState(&m_ctx, &m_request, &m_responder,
36                                                  m_cq, m_cq, (void *) this);
37                 break;
38         case PROCESS:
39                 // Spawn a new CallData instance to serve new clients while we
40                 // process the one for this CallData. The instance will
41                 // deallocate itself as part of its FINISH state.
42                 CallData *cd = new CallData(m_service, m_cq);
43
44                 // The actual processing.
45                 m_status = PROCESSING;
46                 m_repliesSent++;
47                 break;
48         case PROCESSING:
49                 if (m_repliesSent == MAX_REPLIES) {
50                         // And we are done! Let the gRPC runtime know we've
51                         // finished, using the memory address of this instance
52                         // as the uniquely identifying tag for the event.
53                         m_status = FINISH;
54                         m_responder.Finish(Status::OK, this);
55                 } else {
56                         // The actual processing.
57                         m_status = PROCESSING;
58                         m_repliesSent++;
59                 }
60                 break;
61         case FINISH:
62                 GPR_ASSERT(m_status == FINISH);
63                 std::cout << "Completed RPC for: " << this << std::endl;
64                 // Once in the FINISH state, deallocate ourselves (CallData).
65                 delete this;
66                 break;
67         default:
68                 break;
69         }
70 }
71
72 GrpcServiceImpl::~GrpcServiceImpl()
73 {
74         m_server->Shutdown();
75         // Always shutdown the completion queue after the server.
76         m_cq->Shutdown();
77 }
78
79 void
80 GrpcServiceImpl::Run(void)
81 {
82         std::string server_address(kDefaultGrpcServiceAddress);
83
84         grpc::ServerBuilder builder;
85         builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
86
87         builder.RegisterService(&m_service);
88         m_cq = builder.AddCompletionQueue();
89
90         m_server = builder.BuildAndStart();
91         std::cout << "Server listening on " << server_address << std::endl;
92
93         // Proceed to the server's main loop.
94         HandleRpcs();
95 }
96
97 void
98 GrpcServiceImpl::HandleRpcs(void)
99 {
100         // Spawn a new CallData instance to serve new clients.
101         CallData *cd = new CallData(&m_service, m_cq.get());
102
103         // uniquely identifies a request.
104         void *tag;
105         bool ok;
106
107         // Block waiting to read the next event from the completion queue. The
108         // event is uniquely identified by its tag, which in this case is the
109         // memory address of a CallData instance.
110         //
111         // The return value of Next should always be checked. This return value
112         // tells us whether there is any kind of event or cq_ is shutting down.
113         while (true) {
114                 std::cout << "Blocked on next waiting for events" << std::endl;
115                 GPR_ASSERT(m_cq->Next(&tag, &ok));
116                 GPR_ASSERT(ok);
117
118                 std::cout << "Calling tag " << tag << " with Proceed()" << std::endl;
119                 static_cast<CallData*>(tag)->Proceed();
120         }
121 }