Fixed xds-server folder events detection.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 /*
2  * Copyright (C) 2017 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package agent
19
20 import (
21         "encoding/json"
22         "fmt"
23         "io"
24         "io/ioutil"
25         "net/http"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gin-gonic/gin"
31         "github.com/iotbzh/xds-agent/lib/xaapiv1"
32         "github.com/iotbzh/xds-agent/lib/xdsconfig"
33         common "github.com/iotbzh/xds-common/golib"
34         "github.com/iotbzh/xds-server/lib/xsapiv1"
35         uuid "github.com/satori/go.uuid"
36         sio_client "github.com/sebd71/go-socket.io-client"
37 )
38
39 // XdsServer .
40 type XdsServer struct {
41         *Context
42         ID           string
43         BaseURL      string
44         APIURL       string
45         PartialURL   string
46         ConnRetry    int
47         Connected    bool
48         Disabled     bool
49         ServerConfig *xsapiv1.APIConfig
50
51         // Events management
52         CBOnError      func(error)
53         CBOnDisconnect func(error)
54         sockEvents     map[string][]*caller
55         sockEventsLock *sync.Mutex
56
57         // Private fields
58         client    *common.HTTPClient
59         ioSock    *sio_client.Client
60         logOut    io.Writer
61         apiRouter *gin.RouterGroup
62         cmdList   map[string]interface{}
63 }
64
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
67
68 // caller Used to chain event listeners
69 type caller struct {
70         id          uuid.UUID
71         EventName   string
72         Func        EventCB
73         PrivateData interface{}
74 }
75
76 const _IDTempoPrefix = "tempo-"
77
78 // NewXdsServer creates an instance of XdsServer
79 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
80         return &XdsServer{
81                 Context:    ctx,
82                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
83                 BaseURL:    conf.URL,
84                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
85                 PartialURL: conf.APIPartialURL,
86                 ConnRetry:  conf.ConnRetry,
87                 Connected:  false,
88                 Disabled:   false,
89
90                 sockEvents:     make(map[string][]*caller),
91                 sockEventsLock: &sync.Mutex{},
92                 logOut:         ctx.Log.Out,
93                 cmdList:        make(map[string]interface{}),
94         }
95 }
96
97 // Close Free and close XDS Server connection
98 func (xs *XdsServer) Close() error {
99         err := xs._Disconnected()
100         xs.Disabled = true
101         return err
102 }
103
104 // Connect Establish HTTP connection with XDS Server
105 func (xs *XdsServer) Connect() error {
106         var err error
107         var retry int
108
109         xs.Disabled = false
110         xs.Connected = false
111
112         err = nil
113         for retry = xs.ConnRetry; retry > 0; retry-- {
114                 if err = xs._CreateConnectHTTP(); err == nil {
115                         break
116                 }
117                 if retry == xs.ConnRetry {
118                         // Notify only on the first conn error
119                         // doing that avoid 2 notifs (conn false; conn true) on startup
120                         xs._NotifyState()
121                 }
122                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
123                 time.Sleep(time.Second)
124         }
125         if retry == 0 {
126                 // FIXME: re-use _Reconnect to wait longer in background
127                 return fmt.Errorf("Connection to XDS Server failure")
128         }
129         if err != nil {
130                 return err
131         }
132
133         // Check HTTP connection and establish WS connection
134         err = xs._Connect(false)
135
136         return err
137 }
138
139 // IsTempoID returns true when server as a temporary id
140 func (xs *XdsServer) IsTempoID() bool {
141         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
142 }
143
144 // SetLoggerOutput Set logger ou
145 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
146         xs.logOut = out
147 }
148
149 // SendCommand Send a command to XDS Server
150 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
151         url := cmd
152         if !strings.HasPrefix("/", cmd) {
153                 url = "/" + cmd
154         }
155         return xs.client.Post(url, string(body), res)
156 }
157
158 // GetVersion Send Get request to retrieve XDS Server version
159 func (xs *XdsServer) GetVersion(res interface{}) error {
160         return xs.client.Get("/version", &res)
161 }
162
163 // GetFolders Send GET request to get current folder configuration
164 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
165         return xs.client.Get("/folders", folders)
166 }
167
168 // FolderAdd Send POST request to add a folder
169 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
170         err := xs.client.Post("/folders", fld, res)
171         if err != nil {
172                 return fmt.Errorf("FolderAdd error: %s", err.Error())
173         }
174         return err
175 }
176
177 // FolderDelete Send DELETE request to delete a folder
178 func (xs *XdsServer) FolderDelete(id string) error {
179         return xs.client.HTTPDelete("/folders/" + id)
180 }
181
182 // FolderSync Send POST request to force synchronization of a folder
183 func (xs *XdsServer) FolderSync(id string) error {
184         return xs.client.HTTPPost("/folders/sync/"+id, "")
185 }
186
187 // FolderUpdate Send PUT request to update a folder
188 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
189         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
190 }
191
192 // CommandExec Send POST request to execute a command
193 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
194         return xs.client.Post("/exec", args, res)
195 }
196
197 // CommandSignal Send POST request to send a signal to a command
198 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
199         return xs.client.Post("/signal", args, res)
200 }
201
202 // SetAPIRouterGroup .
203 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
204         xs.apiRouter = r
205 }
206
207 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
208 func (xs *XdsServer) PassthroughGet(url string) {
209         if xs.apiRouter == nil {
210                 xs.Log.Errorf("apiRouter not set !")
211                 return
212         }
213
214         xs.apiRouter.GET(url, func(c *gin.Context) {
215                 var data interface{}
216                 // Take care of param (eg. id in /projects/:id)
217                 nURL := url
218                 if strings.Contains(url, ":") {
219                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
220                 }
221                 // Send Get request
222                 if err := xs.client.Get(nURL, &data); err != nil {
223                         if strings.Contains(err.Error(), "connection refused") {
224                                 xs._Disconnected()
225                         }
226                         common.APIError(c, err.Error())
227                         return
228                 }
229
230                 c.JSON(http.StatusOK, data)
231         })
232 }
233
234 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
235 func (xs *XdsServer) PassthroughPost(url string) {
236         if xs.apiRouter == nil {
237                 xs.Log.Errorf("apiRouter not set !")
238                 return
239         }
240
241         xs.apiRouter.POST(url, func(c *gin.Context) {
242                 bodyReq := []byte{}
243                 n, err := c.Request.Body.Read(bodyReq)
244                 if err != nil {
245                         common.APIError(c, err.Error())
246                         return
247                 }
248
249                 // Take care of param (eg. id in /projects/:id)
250                 nURL := url
251                 if strings.Contains(url, ":") {
252                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
253                 }
254
255                 // Send Post request
256                 body, err := json.Marshal(bodyReq[:n])
257                 if err != nil {
258                         common.APIError(c, err.Error())
259                         return
260                 }
261
262                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
263                 if err != nil {
264                         common.APIError(c, err.Error())
265                         return
266                 }
267
268                 bodyRes, err := ioutil.ReadAll(response.Body)
269                 if err != nil {
270                         common.APIError(c, "Cannot read response body")
271                         return
272                 }
273                 c.JSON(http.StatusOK, string(bodyRes))
274         })
275 }
276
277 // EventRegister Post a request to register to an XdsServer event
278 func (xs *XdsServer) EventRegister(evName string, id string) error {
279         return xs.client.Post(
280                 "/events/register",
281                 xsapiv1.EventRegisterArgs{
282                         Name:      evName,
283                         ProjectID: id,
284                 },
285                 nil)
286 }
287
288 // EventEmit Emit a event to XDS Server through WS
289 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
290         if xs.ioSock == nil {
291                 return fmt.Errorf("Io.Socket not initialized")
292         }
293
294         return xs.ioSock.Emit(message, args...)
295 }
296
297 // EventOn Register a callback on events reception
298 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
299         if xs.ioSock == nil {
300                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
301         }
302
303         xs.sockEventsLock.Lock()
304         defer xs.sockEventsLock.Unlock()
305
306         if _, exist := xs.sockEvents[evName]; !exist {
307                 // Register listener only the first time
308                 evn := evName
309
310                 err := xs.ioSock.On(evn, func(data interface{}) error {
311                                 xs.sockEventsLock.Lock()
312                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
313                                 copy(sEvts, xs.sockEvents[evn])
314                                 xs.sockEventsLock.Unlock()
315                                 for _, c := range sEvts {
316                                         c.Func(c.PrivateData, data)
317                                 }
318                                 return nil
319                         })
320                 if err != nil {
321                         return uuid.Nil, err
322                 }
323         }
324
325         c := &caller{
326                 id:          uuid.NewV1(),
327                 EventName:   evName,
328                 Func:        f,
329                 PrivateData: privData,
330         }
331
332         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
333         xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
334         return c.id, nil
335 }
336
337 // EventOff Un-register a (or all) callbacks associated to an event
338 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
339         xs.sockEventsLock.Lock()
340         defer xs.sockEventsLock.Unlock()
341         if _, exist := xs.sockEvents[evName]; exist {
342                 if id == uuid.Nil {
343                         // Un-register all
344                         xs.sockEvents[evName] = []*caller{}
345                 } else {
346                         // Un-register only the specified callback
347                         for i, ff := range xs.sockEvents[evName] {
348                                 if uuid.Equal(ff.id, id) {
349                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
350                                         break
351                                 }
352                         }
353                 }
354         }
355         xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
356         return nil
357 }
358
359 // ProjectToFolder Convert Project structure to Folder structure
360 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
361         stID := ""
362         if pPrj.Type == xsapiv1.TypeCloudSync {
363                 stID, _ = xs.SThg.IDGet()
364         }
365         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
366         fPrj := xsapiv1.FolderConfig{
367                 ID:         pPrj.ID,
368                 Label:      pPrj.Label,
369                 ClientPath: pPrj.ClientPath,
370                 Type:       xsapiv1.FolderType(pPrj.Type),
371                 Status:     pPrj.Status,
372                 IsInSync:   pPrj.IsInSync,
373                 DefaultSdk: pPrj.DefaultSdk,
374                 ClientData: pPrj.ClientData,
375                 DataPathMap: xsapiv1.PathMapConfig{
376                         ServerPath: pPrj.ServerPath,
377                 },
378                 DataCloudSync: xsapiv1.CloudSyncConfig{
379                         SyncThingID:   stID,
380                         STLocIsInSync: pPrj.IsInSync,
381                         STLocStatus:   pPrj.Status,
382                         STSvrIsInSync: pPrj.IsInSync,
383                         STSvrStatus:   pPrj.Status,
384                 },
385         }
386
387         return &fPrj
388 }
389
390 // FolderToProject Convert Folder structure to Project structure
391 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
392         inSync := fPrj.IsInSync
393         sts := fPrj.Status
394
395         if fPrj.Type == xsapiv1.TypeCloudSync {
396                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
397
398                 sts = fPrj.DataCloudSync.STSvrStatus
399                 switch fPrj.DataCloudSync.STLocStatus {
400                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
401                         sts = fPrj.DataCloudSync.STLocStatus
402                         break
403                 case xaapiv1.StatusSyncing:
404                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
405                                 sts = xaapiv1.StatusSyncing
406                         }
407                         break
408                 case xaapiv1.StatusEnable:
409                         // keep STSvrStatus
410                         break
411                 }
412         }
413
414         pPrj := xaapiv1.ProjectConfig{
415                 ID:         fPrj.ID,
416                 ServerID:   xs.ID,
417                 Label:      fPrj.Label,
418                 ClientPath: fPrj.ClientPath,
419                 ServerPath: fPrj.DataPathMap.ServerPath,
420                 Type:       xaapiv1.ProjectType(fPrj.Type),
421                 Status:     sts,
422                 IsInSync:   inSync,
423                 DefaultSdk: fPrj.DefaultSdk,
424                 ClientData: fPrj.ClientData,
425         }
426         return pPrj
427 }
428
429 // CommandAdd Add a new command to the list of running commands
430 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
431         if xs.CommandGet(cmdID) != nil {
432                 return fmt.Errorf("command id already exist")
433         }
434         xs.cmdList[cmdID] = data
435         return nil
436 }
437
438 // CommandDelete Delete a command from the command list
439 func (xs *XdsServer) CommandDelete(cmdID string) error {
440         if xs.CommandGet(cmdID) == nil {
441                 return fmt.Errorf("unknown command id")
442         }
443         delete(xs.cmdList, cmdID)
444         return nil
445 }
446
447 // CommandGet Retrieve a command data
448 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
449         d, exist := xs.cmdList[cmdID]
450         if exist {
451                 return d
452         }
453         return nil
454 }
455
456 /***
457 ** Private functions
458 ***/
459
460 // Create HTTP client
461 func (xs *XdsServer) _CreateConnectHTTP() error {
462         var err error
463         xs.client, err = common.HTTPNewClient(xs.BaseURL,
464                 common.HTTPClientConfig{
465                         URLPrefix:           "/api/v1",
466                         HeaderClientKeyName: "Xds-Sid",
467                         CsrfDisable:         true,
468                         LogOut:              xs.logOut,
469                         LogPrefix:           "XDSSERVER: ",
470                         LogLevel:            common.HTTPLogLevelWarning,
471                 })
472
473         xs.client.SetLogLevel(xs.Log.Level.String())
474
475         if err != nil {
476                 msg := ": " + err.Error()
477                 if strings.Contains(err.Error(), "connection refused") {
478                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
479                 }
480                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
481         }
482         if xs.client == nil {
483                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
484         }
485
486         return nil
487 }
488
489 // _Reconnect Re-established connection
490 func (xs *XdsServer) _Reconnect() error {
491         err := xs._Connect(true)
492         if err == nil {
493                 // Reload projects list for this server
494                 err = xs.projects.Init(xs)
495         }
496         return err
497 }
498
499 // _Connect Established HTTP and WS connection and retrieve XDSServer config
500 func (xs *XdsServer) _Connect(reConn bool) error {
501
502         xdsCfg := xsapiv1.APIConfig{}
503         if err := xs.client.Get("/config", &xdsCfg); err != nil {
504                 xs.Connected = false
505                 if !reConn {
506                         xs._NotifyState()
507                 }
508                 return err
509         }
510
511         if reConn && xs.ID != xdsCfg.ServerUID {
512                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
513         }
514
515         // Update local XDS config
516         xs.ID = xdsCfg.ServerUID
517         xs.ServerConfig = &xdsCfg
518
519         // Establish WS connection and register listen
520         if err := xs._SocketConnect(); err != nil {
521                 xs._Disconnected()
522                 return err
523         }
524
525         xs.Connected = true
526         xs._NotifyState()
527         return nil
528 }
529
530 // _SocketConnect Create WebSocket (io.socket) connection
531 func (xs *XdsServer) _SocketConnect() error {
532
533         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
534
535         opts := &sio_client.Options{
536                 Transport: "websocket",
537                 Header:    make(map[string][]string),
538         }
539         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
540
541         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
542         if err != nil {
543                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
544         }
545         xs.ioSock = iosk
546
547         // Register some listeners
548
549         iosk.On("error", func(err error) {
550                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
551                 if xs.CBOnError != nil {
552                         xs.CBOnError(err)
553                 }
554         })
555
556         iosk.On("disconnection", func(err error) {
557                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
558                 if xs.CBOnDisconnect != nil {
559                         xs.CBOnDisconnect(err)
560                 }
561                 xs._Disconnected()
562
563                 // Try to reconnect during 15min (or at least while not disabled)
564                 go func() {
565                         count := 0
566                         waitTime := 1
567                         for !xs.Disabled && !xs.Connected {
568                                 count++
569                                 if count%60 == 0 {
570                                         waitTime *= 5
571                                 }
572                                 if waitTime > 15*60 {
573                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
574                                         return
575                                 }
576                                 time.Sleep(time.Second * time.Duration(waitTime))
577                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
578
579                                 err := xs._Reconnect()
580                                 if err != nil &&
581                                         !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
582                                         xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
583                                 }
584
585                         }
586                 }()
587         })
588
589         // XXX - There is no connection event generated so, just consider that
590         // we are connected when NewClient return successfully
591         /* iosk.On("connection", func() { ... }) */
592         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
593
594         return nil
595 }
596
597 // _Disconnected Set XDS Server as disconnected
598 func (xs *XdsServer) _Disconnected() error {
599         // Clear all register events as socket is closed
600         for k := range xs.sockEvents {
601                 delete(xs.sockEvents, k)
602         }
603         xs.Connected = false
604         xs.ioSock = nil
605         xs._NotifyState()
606         return nil
607 }
608
609 // _NotifyState Send event to notify changes
610 func (xs *XdsServer) _NotifyState() {
611
612         evSts := xaapiv1.ServerCfg{
613                 ID:         xs.ID,
614                 URL:        xs.BaseURL,
615                 APIURL:     xs.APIURL,
616                 PartialURL: xs.PartialURL,
617                 ConnRetry:  xs.ConnRetry,
618                 Connected:  xs.Connected,
619         }
620         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
621                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
622         }
623 }