Fixed /exec input stream and /signal.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/xaapiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         "github.com/iotbzh/xds-server/lib/xsapiv1"
18         uuid "github.com/satori/go.uuid"
19         sio_client "github.com/sebd71/go-socket.io-client"
20 )
21
22 // XdsServer .
23 type XdsServer struct {
24         *Context
25         ID           string
26         BaseURL      string
27         APIURL       string
28         PartialURL   string
29         ConnRetry    int
30         Connected    bool
31         Disabled     bool
32         ServerConfig *xsapiv1.APIConfig
33
34         // Events management
35         CBOnError      func(error)
36         CBOnDisconnect func(error)
37         sockEvents     map[string][]*caller
38         sockEventsLock *sync.Mutex
39
40         // Private fields
41         client    *common.HTTPClient
42         ioSock    *sio_client.Client
43         logOut    io.Writer
44         apiRouter *gin.RouterGroup
45         cmdList   map[string]interface{}
46 }
47
48 // EventCB Event emitter callback
49 type EventCB func(privData interface{}, evtData interface{}) error
50
51 // caller Used to chain event listeners
52 type caller struct {
53         id          uuid.UUID
54         EventName   string
55         Func        EventCB
56         PrivateData interface{}
57 }
58
59 const _IDTempoPrefix = "tempo-"
60
61 // NewXdsServer creates an instance of XdsServer
62 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
63         return &XdsServer{
64                 Context:    ctx,
65                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
66                 BaseURL:    conf.URL,
67                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
68                 PartialURL: conf.APIPartialURL,
69                 ConnRetry:  conf.ConnRetry,
70                 Connected:  false,
71                 Disabled:   false,
72
73                 sockEvents:     make(map[string][]*caller),
74                 sockEventsLock: &sync.Mutex{},
75                 logOut:         ctx.Log.Out,
76                 cmdList:        make(map[string]interface{}),
77         }
78 }
79
80 // Close Free and close XDS Server connection
81 func (xs *XdsServer) Close() error {
82         xs.Connected = false
83         xs.Disabled = true
84         xs.ioSock = nil
85         xs._NotifyState()
86         return nil
87 }
88
89 // Connect Establish HTTP connection with XDS Server
90 func (xs *XdsServer) Connect() error {
91         var err error
92         var retry int
93
94         xs.Disabled = false
95         xs.Connected = false
96
97         err = nil
98         for retry = xs.ConnRetry; retry > 0; retry-- {
99                 if err = xs._CreateConnectHTTP(); err == nil {
100                         break
101                 }
102                 if retry == xs.ConnRetry {
103                         // Notify only on the first conn error
104                         // doing that avoid 2 notifs (conn false; conn true) on startup
105                         xs._NotifyState()
106                 }
107                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
108                 time.Sleep(time.Second)
109         }
110         if retry == 0 {
111                 // FIXME: re-use _reconnect to wait longer in background
112                 return fmt.Errorf("Connection to XDS Server failure")
113         }
114         if err != nil {
115                 return err
116         }
117
118         // Check HTTP connection and establish WS connection
119         err = xs._connect(false)
120
121         return err
122 }
123
124 // IsTempoID returns true when server as a temporary id
125 func (xs *XdsServer) IsTempoID() bool {
126         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
127 }
128
129 // SetLoggerOutput Set logger ou
130 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
131         xs.logOut = out
132 }
133
134 // SendCommand Send a command to XDS Server
135 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
136         url := cmd
137         if !strings.HasPrefix("/", cmd) {
138                 url = "/" + cmd
139         }
140         return xs.client.Post(url, string(body), res)
141 }
142
143 // GetVersion Send Get request to retrieve XDS Server version
144 func (xs *XdsServer) GetVersion(res interface{}) error {
145         return xs.client.Get("/version", &res)
146 }
147
148 // GetFolders Send GET request to get current folder configuration
149 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
150         return xs.client.Get("/folders", folders)
151 }
152
153 // FolderAdd Send POST request to add a folder
154 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
155         err := xs.client.Post("/folders", fld, res)
156         if err != nil {
157                 return fmt.Errorf("FolderAdd error: %s", err.Error())
158         }
159         return err
160 }
161
162 // FolderDelete Send DELETE request to delete a folder
163 func (xs *XdsServer) FolderDelete(id string) error {
164         return xs.client.HTTPDelete("/folders/" + id)
165 }
166
167 // FolderSync Send POST request to force synchronization of a folder
168 func (xs *XdsServer) FolderSync(id string) error {
169         return xs.client.HTTPPost("/folders/sync/"+id, "")
170 }
171
172 // FolderUpdate Send PUT request to update a folder
173 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
174         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
175 }
176
177 // CommandExec Send POST request to execute a command
178 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
179         return xs.client.Post("/exec", args, res)
180 }
181
182 // CommandSignal Send POST request to send a signal to a command
183 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
184         return xs.client.Post("/signal", args, res)
185 }
186
187 // SetAPIRouterGroup .
188 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
189         xs.apiRouter = r
190 }
191
192 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
193 func (xs *XdsServer) PassthroughGet(url string) {
194         if xs.apiRouter == nil {
195                 xs.Log.Errorf("apiRouter not set !")
196                 return
197         }
198
199         xs.apiRouter.GET(url, func(c *gin.Context) {
200                 var data interface{}
201                 // Take care of param (eg. id in /projects/:id)
202                 nURL := url
203                 if strings.Contains(url, ":") {
204                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
205                 }
206                 // Send Get request
207                 if err := xs.client.Get(nURL, &data); err != nil {
208                         if strings.Contains(err.Error(), "connection refused") {
209                                 xs.Connected = false
210                                 xs._NotifyState()
211                         }
212                         common.APIError(c, err.Error())
213                         return
214                 }
215
216                 c.JSON(http.StatusOK, data)
217         })
218 }
219
220 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
221 func (xs *XdsServer) PassthroughPost(url string) {
222         if xs.apiRouter == nil {
223                 xs.Log.Errorf("apiRouter not set !")
224                 return
225         }
226
227         xs.apiRouter.POST(url, func(c *gin.Context) {
228                 bodyReq := []byte{}
229                 n, err := c.Request.Body.Read(bodyReq)
230                 if err != nil {
231                         common.APIError(c, err.Error())
232                         return
233                 }
234
235                 // Take care of param (eg. id in /projects/:id)
236                 nURL := url
237                 if strings.Contains(url, ":") {
238                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
239                 }
240
241                 // Send Post request
242                 body, err := json.Marshal(bodyReq[:n])
243                 if err != nil {
244                         common.APIError(c, err.Error())
245                         return
246                 }
247
248                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
249                 if err != nil {
250                         common.APIError(c, err.Error())
251                         return
252                 }
253
254                 bodyRes, err := ioutil.ReadAll(response.Body)
255                 if err != nil {
256                         common.APIError(c, "Cannot read response body")
257                         return
258                 }
259                 c.JSON(http.StatusOK, string(bodyRes))
260         })
261 }
262
263 // EventRegister Post a request to register to an XdsServer event
264 func (xs *XdsServer) EventRegister(evName string, id string) error {
265         return xs.client.Post(
266                 "/events/register",
267                 xsapiv1.EventRegisterArgs{
268                         Name:      evName,
269                         ProjectID: id,
270                 },
271                 nil)
272 }
273
274 // EventEmit Emit a event to XDS Server through WS
275 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
276         if xs.ioSock == nil {
277                 return fmt.Errorf("Io.Socket not initialized")
278         }
279
280         return xs.ioSock.Emit(message, args...)
281 }
282
283 // EventOn Register a callback on events reception
284 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
285         if xs.ioSock == nil {
286                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
287         }
288
289         xs.sockEventsLock.Lock()
290         defer xs.sockEventsLock.Unlock()
291
292         if _, exist := xs.sockEvents[evName]; !exist {
293                 // Register listener only the first time
294                 evn := evName
295
296                 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
297                 var err error
298                 if evName == "event:folder-state-change" {
299                         err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
300                                 xs.sockEventsLock.Lock()
301                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
302                                 copy(sEvts, xs.sockEvents[evn])
303                                 xs.sockEventsLock.Unlock()
304                                 for _, c := range sEvts {
305                                         c.Func(c.PrivateData, data)
306                                 }
307                                 return nil
308                         })
309                 } else {
310                         err = xs.ioSock.On(evn, func(data interface{}) error {
311                                 xs.sockEventsLock.Lock()
312                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
313                                 copy(sEvts, xs.sockEvents[evn])
314                                 xs.sockEventsLock.Unlock()
315                                 for _, c := range sEvts {
316                                         c.Func(c.PrivateData, data)
317                                 }
318                                 return nil
319                         })
320                 }
321                 if err != nil {
322                         return uuid.Nil, err
323                 }
324         }
325
326         c := &caller{
327                 id:          uuid.NewV1(),
328                 EventName:   evName,
329                 Func:        f,
330                 PrivateData: privData,
331         }
332
333         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
334         return c.id, nil
335 }
336
337 // EventOff Un-register a (or all) callbacks associated to an event
338 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
339         xs.sockEventsLock.Lock()
340         defer xs.sockEventsLock.Unlock()
341         if _, exist := xs.sockEvents[evName]; exist {
342                 if id == uuid.Nil {
343                         // Un-register all
344                         xs.sockEvents[evName] = []*caller{}
345                 } else {
346                         // Un-register only the specified callback
347                         for i, ff := range xs.sockEvents[evName] {
348                                 if uuid.Equal(ff.id, id) {
349                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
350                                         break
351                                 }
352                         }
353                 }
354         }
355         return nil
356 }
357
358 // ProjectToFolder Convert Project structure to Folder structure
359 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
360         stID := ""
361         if pPrj.Type == xsapiv1.TypeCloudSync {
362                 stID, _ = xs.SThg.IDGet()
363         }
364         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
365         fPrj := xsapiv1.FolderConfig{
366                 ID:         pPrj.ID,
367                 Label:      pPrj.Label,
368                 ClientPath: pPrj.ClientPath,
369                 Type:       xsapiv1.FolderType(pPrj.Type),
370                 Status:     pPrj.Status,
371                 IsInSync:   pPrj.IsInSync,
372                 DefaultSdk: pPrj.DefaultSdk,
373                 ClientData: pPrj.ClientData,
374                 DataPathMap: xsapiv1.PathMapConfig{
375                         ServerPath: pPrj.ServerPath,
376                 },
377                 DataCloudSync: xsapiv1.CloudSyncConfig{
378                         SyncThingID:   stID,
379                         STLocIsInSync: pPrj.IsInSync,
380                         STLocStatus:   pPrj.Status,
381                         STSvrIsInSync: pPrj.IsInSync,
382                         STSvrStatus:   pPrj.Status,
383                 },
384         }
385
386         return &fPrj
387 }
388
389 // FolderToProject Convert Folder structure to Project structure
390 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
391         inSync := fPrj.IsInSync
392         sts := fPrj.Status
393
394         if fPrj.Type == xsapiv1.TypeCloudSync {
395                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
396
397                 sts = fPrj.DataCloudSync.STSvrStatus
398                 switch fPrj.DataCloudSync.STLocStatus {
399                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
400                         sts = fPrj.DataCloudSync.STLocStatus
401                         break
402                 case xaapiv1.StatusSyncing:
403                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
404                                 sts = xaapiv1.StatusSyncing
405                         }
406                         break
407                 case xaapiv1.StatusEnable:
408                         // keep STSvrStatus
409                         break
410                 }
411         }
412
413         pPrj := xaapiv1.ProjectConfig{
414                 ID:         fPrj.ID,
415                 ServerID:   xs.ID,
416                 Label:      fPrj.Label,
417                 ClientPath: fPrj.ClientPath,
418                 ServerPath: fPrj.DataPathMap.ServerPath,
419                 Type:       xaapiv1.ProjectType(fPrj.Type),
420                 Status:     sts,
421                 IsInSync:   inSync,
422                 DefaultSdk: fPrj.DefaultSdk,
423                 ClientData: fPrj.ClientData,
424         }
425         return pPrj
426 }
427
428 // CommandAdd Add a new command to the list of running commands
429 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
430         if xs.CommandGet(cmdID) != nil {
431                 return fmt.Errorf("command id already exist")
432         }
433         xs.cmdList[cmdID] = data
434         return nil
435 }
436
437 // CommandDelete Delete a command from the command list
438 func (xs *XdsServer) CommandDelete(cmdID string) error {
439         if xs.CommandGet(cmdID) == nil {
440                 return fmt.Errorf("unknown command id")
441         }
442         delete(xs.cmdList, cmdID)
443         return nil
444 }
445
446 // CommandGet Retrieve a command data
447 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
448         d, exist := xs.cmdList[cmdID]
449         if exist {
450                 return d
451         }
452         return nil
453 }
454
455 /***
456 ** Private functions
457 ***/
458
459 // Create HTTP client
460 func (xs *XdsServer) _CreateConnectHTTP() error {
461         var err error
462         xs.client, err = common.HTTPNewClient(xs.BaseURL,
463                 common.HTTPClientConfig{
464                         URLPrefix:           "/api/v1",
465                         HeaderClientKeyName: "Xds-Sid",
466                         CsrfDisable:         true,
467                         LogOut:              xs.logOut,
468                         LogPrefix:           "XDSSERVER: ",
469                         LogLevel:            common.HTTPLogLevelWarning,
470                 })
471
472         xs.client.SetLogLevel(xs.Log.Level.String())
473
474         if err != nil {
475                 msg := ": " + err.Error()
476                 if strings.Contains(err.Error(), "connection refused") {
477                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
478                 }
479                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
480         }
481         if xs.client == nil {
482                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
483         }
484
485         return nil
486 }
487
488 //  Re-established connection
489 func (xs *XdsServer) _reconnect() error {
490         err := xs._connect(true)
491         if err == nil {
492                 // Reload projects list for this server
493                 err = xs.projects.Init(xs)
494         }
495         return err
496 }
497
498 //  Established HTTP and WS connection and retrieve XDSServer config
499 func (xs *XdsServer) _connect(reConn bool) error {
500
501         xdsCfg := xsapiv1.APIConfig{}
502         if err := xs.client.Get("/config", &xdsCfg); err != nil {
503                 xs.Connected = false
504                 if !reConn {
505                         xs._NotifyState()
506                 }
507                 return err
508         }
509
510         if reConn && xs.ID != xdsCfg.ServerUID {
511                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
512         }
513
514         // Update local XDS config
515         xs.ID = xdsCfg.ServerUID
516         xs.ServerConfig = &xdsCfg
517
518         // Establish WS connection and register listen
519         if err := xs._SocketConnect(); err != nil {
520                 xs.Connected = false
521                 xs._NotifyState()
522                 return err
523         }
524
525         xs.Connected = true
526         xs._NotifyState()
527         return nil
528 }
529
530 // Create WebSocket (io.socket) connection
531 func (xs *XdsServer) _SocketConnect() error {
532
533         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
534
535         opts := &sio_client.Options{
536                 Transport: "websocket",
537                 Header:    make(map[string][]string),
538         }
539         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
540
541         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
542         if err != nil {
543                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
544         }
545         xs.ioSock = iosk
546
547         // Register some listeners
548
549         iosk.On("error", func(err error) {
550                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
551                 if xs.CBOnError != nil {
552                         xs.CBOnError(err)
553                 }
554         })
555
556         iosk.On("disconnection", func(err error) {
557                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
558                 if xs.CBOnDisconnect != nil {
559                         xs.CBOnDisconnect(err)
560                 }
561                 xs.Connected = false
562                 xs._NotifyState()
563
564                 // Try to reconnect during 15min (or at least while not disabled)
565                 go func() {
566                         count := 0
567                         waitTime := 1
568                         for !xs.Disabled && !xs.Connected {
569                                 count++
570                                 if count%60 == 0 {
571                                         waitTime *= 5
572                                 }
573                                 if waitTime > 15*60 {
574                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
575                                         return
576                                 }
577                                 time.Sleep(time.Second * time.Duration(waitTime))
578                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
579
580                                 xs._reconnect()
581                         }
582                 }()
583         })
584
585         // XXX - There is no connection event generated so, just consider that
586         // we are connected when NewClient return successfully
587         /* iosk.On("connection", func() { ... }) */
588         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
589
590         return nil
591 }
592
593 // Send event to notify changes
594 func (xs *XdsServer) _NotifyState() {
595
596         evSts := xaapiv1.ServerCfg{
597                 ID:         xs.ID,
598                 URL:        xs.BaseURL,
599                 APIURL:     xs.APIURL,
600                 PartialURL: xs.PartialURL,
601                 ConnRetry:  xs.ConnRetry,
602                 Connected:  xs.Connected,
603         }
604         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
605                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
606         }
607 }