Fixed events definition and callback processing.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/apiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         uuid "github.com/satori/go.uuid"
18         sio_client "github.com/sebd71/go-socket.io-client"
19 )
20
21 // XdsServer .
22 type XdsServer struct {
23         *Context
24         ID           string
25         BaseURL      string
26         APIURL       string
27         PartialURL   string
28         ConnRetry    int
29         Connected    bool
30         Disabled     bool
31         ServerConfig *XdsServerConfig
32
33         // Events management
34         CBOnError      func(error)
35         CBOnDisconnect func(error)
36         sockEvents     map[string][]*caller
37         sockEventsLock *sync.Mutex
38
39         // Private fields
40         client    *common.HTTPClient
41         ioSock    *sio_client.Client
42         logOut    io.Writer
43         apiRouter *gin.RouterGroup
44 }
45
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
48         ID               string           `json:"id"`
49         Version          string           `json:"version"`
50         APIVersion       string           `json:"apiVersion"`
51         VersionGitTag    string           `json:"gitTag"`
52         SupportedSharing map[string]bool  `json:"supportedSharing"`
53         Builder          XdsBuilderConfig `json:"builder"`
54 }
55
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
58         IP          string `json:"ip"`
59         Port        string `json:"port"`
60         SyncThingID string `json:"syncThingID"`
61 }
62
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
65
66 const (
67         XdsTypePathMap   = "PathMap"
68         XdsTypeCloudSync = "CloudSync"
69         XdsTypeCifsSmb   = "CIFS"
70 )
71
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
74         ID         string        `json:"id"`
75         Label      string        `json:"label"`
76         ClientPath string        `json:"path"`
77         Type       XdsFolderType `json:"type"`
78         Status     string        `json:"status"`
79         IsInSync   bool          `json:"isInSync"`
80         DefaultSdk string        `json:"defaultSdk"`
81         // Specific data depending on which Type is used
82         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
83         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
84 }
85
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88         ServerPath   string `json:"serverPath"`
89         CheckFile    string `json:"checkFile"`
90         CheckContent string `json:"checkContent"`
91 }
92
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95         SyncThingID   string `json:"syncThingID"`
96         STSvrStatus   string `json:"-"`
97         STSvrIsInSync bool   `json:"-"`
98         STLocStatus   string `json:"-"`
99         STLocIsInSync bool   `json:"-"`
100 }
101
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104         Name      string `json:"name"`
105         ProjectID string `json:"filterProjectID"`
106 }
107
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110         Time   string          `json:"time"`
111         Type   string          `json:"type"`
112         Folder XdsFolderConfig `json:"folder"`
113 }
114
115 // Event emitter callback
116 type EventCB func(privData interface{}, evtData interface{}) error
117
118 // caller Used to chain event listeners
119 type caller struct {
120         id          uuid.UUID
121         EventName   string
122         Func        EventCB
123         PrivateData interface{}
124 }
125
126 const _IDTempoPrefix = "tempo-"
127
128 // NewXdsServer creates an instance of XdsServer
129 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
130         return &XdsServer{
131                 Context:    ctx,
132                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
133                 BaseURL:    conf.URL,
134                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
135                 PartialURL: conf.APIPartialURL,
136                 ConnRetry:  conf.ConnRetry,
137                 Connected:  false,
138                 Disabled:   false,
139
140                 sockEvents:     make(map[string][]*caller),
141                 sockEventsLock: &sync.Mutex{},
142                 logOut:         ctx.Log.Out,
143         }
144 }
145
146 // Close Free and close XDS Server connection
147 func (xs *XdsServer) Close() error {
148         xs.Connected = false
149         xs.Disabled = true
150         xs.ioSock = nil
151         xs._NotifyState()
152         return nil
153 }
154
155 // Connect Establish HTTP connection with XDS Server
156 func (xs *XdsServer) Connect() error {
157         var err error
158         var retry int
159
160         xs.Disabled = false
161         xs.Connected = false
162
163         err = nil
164         for retry = xs.ConnRetry; retry > 0; retry-- {
165                 if err = xs._CreateConnectHTTP(); err == nil {
166                         break
167                 }
168                 if retry == xs.ConnRetry {
169                         // Notify only on the first conn error
170                         // doing that avoid 2 notifs (conn false; conn true) on startup
171                         xs._NotifyState()
172                 }
173                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
174                 time.Sleep(time.Second)
175         }
176         if retry == 0 {
177                 // FIXME: re-use _reconnect to wait longer in background
178                 return fmt.Errorf("Connection to XDS Server failure")
179         }
180         if err != nil {
181                 return err
182         }
183
184         // Check HTTP connection and establish WS connection
185         err = xs._connect(false)
186
187         return err
188 }
189
190 // IsTempoID returns true when server as a temporary id
191 func (xs *XdsServer) IsTempoID() bool {
192         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
193 }
194
195 // SetLoggerOutput Set logger ou
196 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
197         xs.logOut = out
198 }
199
200 // SendCommand Send a command to XDS Server
201 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
202         url := cmd
203         if !strings.HasPrefix("/", cmd) {
204                 url = "/" + cmd
205         }
206         return xs.client.HTTPPostWithRes(url, string(body))
207 }
208
209 // GetVersion Send Get request to retrieve XDS Server version
210 func (xs *XdsServer) GetVersion(res interface{}) error {
211         return xs._HTTPGet("/version", &res)
212 }
213
214 // GetFolders Send GET request to get current folder configuration
215 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
216         return xs._HTTPGet("/folders", folders)
217 }
218
219 // FolderAdd Send POST request to add a folder
220 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
221         response, err := xs._HTTPPost("/folders", fld)
222         if err != nil {
223                 return err
224         }
225         if response.StatusCode != 200 {
226                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
227         }
228         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
229         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
230
231         return err
232 }
233
234 // FolderDelete Send DELETE request to delete a folder
235 func (xs *XdsServer) FolderDelete(id string) error {
236         return xs.client.HTTPDelete("/folders/" + id)
237 }
238
239 // FolderSync Send POST request to force synchronization of a folder
240 func (xs *XdsServer) FolderSync(id string) error {
241         return xs.client.HTTPPost("/folders/sync/"+id, "")
242 }
243
244 // SetAPIRouterGroup .
245 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
246         xs.apiRouter = r
247 }
248
249 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
250 func (xs *XdsServer) PassthroughGet(url string) {
251         if xs.apiRouter == nil {
252                 xs.Log.Errorf("apiRouter not set !")
253                 return
254         }
255
256         xs.apiRouter.GET(url, func(c *gin.Context) {
257                 var data interface{}
258                 // Take care of param (eg. id in /projects/:id)
259                 nURL := url
260                 if strings.Contains(url, ":") {
261                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
262                 }
263                 // Send Get request
264                 if err := xs._HTTPGet(nURL, &data); err != nil {
265                         if strings.Contains(err.Error(), "connection refused") {
266                                 xs.Connected = false
267                                 xs._NotifyState()
268                         }
269                         common.APIError(c, err.Error())
270                         return
271                 }
272
273                 c.JSON(http.StatusOK, data)
274         })
275 }
276
277 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
278 func (xs *XdsServer) PassthroughPost(url string) {
279         if xs.apiRouter == nil {
280                 xs.Log.Errorf("apiRouter not set !")
281                 return
282         }
283
284         xs.apiRouter.POST(url, func(c *gin.Context) {
285                 bodyReq := []byte{}
286                 n, err := c.Request.Body.Read(bodyReq)
287                 if err != nil {
288                         common.APIError(c, err.Error())
289                         return
290                 }
291
292                 // Take care of param (eg. id in /projects/:id)
293                 nURL := url
294                 if strings.Contains(url, ":") {
295                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
296                 }
297                 // Send Post request
298                 response, err := xs._HTTPPost(nURL, bodyReq[:n])
299                 if err != nil {
300                         common.APIError(c, err.Error())
301                         return
302                 }
303                 bodyRes, err := ioutil.ReadAll(response.Body)
304                 if err != nil {
305                         common.APIError(c, "Cannot read response body")
306                         return
307                 }
308                 c.JSON(http.StatusOK, string(bodyRes))
309         })
310 }
311
312 // EventRegister Post a request to register to an XdsServer event
313 func (xs *XdsServer) EventRegister(evName string, id string) error {
314         var err error
315         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
316                 Name:      evName,
317                 ProjectID: id,
318         })
319         return err
320 }
321
322 // EventOn Register a callback on events reception
323 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
324         if xs.ioSock == nil {
325                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
326         }
327
328         xs.sockEventsLock.Lock()
329         defer xs.sockEventsLock.Unlock()
330
331         if _, exist := xs.sockEvents[evName]; !exist {
332                 // Register listener only the first time
333                 evn := evName
334
335                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
336                 var err error
337                 if evName == "event:FolderStateChanged" {
338                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
339                                 xs.sockEventsLock.Lock()
340                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
341                                 copy(sEvts, xs.sockEvents[evn])
342                                 xs.sockEventsLock.Unlock()
343                                 for _, c := range sEvts {
344                                         c.Func(c.PrivateData, data)
345                                 }
346                                 return nil
347                         })
348                 } else {
349                         err = xs.ioSock.On(evn, func(data interface{}) error {
350                                 xs.sockEventsLock.Lock()
351                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
352                                 copy(sEvts, xs.sockEvents[evn])
353                                 xs.sockEventsLock.Unlock()
354                                 for _, c := range sEvts {
355                                         c.Func(c.PrivateData, data)
356                                 }
357                                 return nil
358                         })
359                 }
360                 if err != nil {
361                         return uuid.Nil, err
362                 }
363         }
364
365         c := &caller{
366                 id:          uuid.NewV1(),
367                 EventName:   evName,
368                 Func:        f,
369                 PrivateData: privData,
370         }
371
372         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
373         return c.id, nil
374 }
375
376 // EventOff Un-register a (or all) callbacks associated to an event
377 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
378         xs.sockEventsLock.Lock()
379         defer xs.sockEventsLock.Unlock()
380         if _, exist := xs.sockEvents[evName]; exist {
381                 if id == uuid.Nil {
382                         // Un-register all
383                         xs.sockEvents[evName] = []*caller{}
384                 } else {
385                         // Un-register only the specified callback
386                         for i, ff := range xs.sockEvents[evName] {
387                                 if uuid.Equal(ff.id, id) {
388                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
389                                         break
390                                 }
391                         }
392                 }
393         }
394         return nil
395 }
396
397 // ProjectToFolder Convert Project structure to Folder structure
398 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
399         stID := ""
400         if pPrj.Type == XdsTypeCloudSync {
401                 stID, _ = xs.SThg.IDGet()
402         }
403         fPrj := XdsFolderConfig{
404                 ID:         pPrj.ID,
405                 Label:      pPrj.Label,
406                 ClientPath: pPrj.ClientPath,
407                 Type:       XdsFolderType(pPrj.Type),
408                 Status:     pPrj.Status,
409                 IsInSync:   pPrj.IsInSync,
410                 DefaultSdk: pPrj.DefaultSdk,
411                 DataPathMap: XdsPathMapConfig{
412                         ServerPath: pPrj.ServerPath,
413                 },
414                 DataCloudSync: XdsCloudSyncConfig{
415                         SyncThingID:   stID,
416                         STLocIsInSync: pPrj.IsInSync,
417                         STLocStatus:   pPrj.Status,
418                         STSvrIsInSync: pPrj.IsInSync,
419                         STSvrStatus:   pPrj.Status,
420                 },
421         }
422
423         return &fPrj
424 }
425
426 // FolderToProject Convert Folder structure to Project structure
427 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
428         inSync := fPrj.IsInSync
429         sts := fPrj.Status
430
431         if fPrj.Type == XdsTypeCloudSync {
432                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
433
434                 sts = fPrj.DataCloudSync.STSvrStatus
435                 switch fPrj.DataCloudSync.STLocStatus {
436                 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
437                         sts = fPrj.DataCloudSync.STLocStatus
438                         break
439                 case apiv1.StatusSyncing:
440                         if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
441                                 sts = apiv1.StatusSyncing
442                         }
443                         break
444                 case apiv1.StatusEnable:
445                         // keep STSvrStatus
446                         break
447                 }
448         }
449
450         pPrj := apiv1.ProjectConfig{
451                 ID:         fPrj.ID,
452                 ServerID:   xs.ID,
453                 Label:      fPrj.Label,
454                 ClientPath: fPrj.ClientPath,
455                 ServerPath: fPrj.DataPathMap.ServerPath,
456                 Type:       apiv1.ProjectType(fPrj.Type),
457                 Status:     sts,
458                 IsInSync:   inSync,
459                 DefaultSdk: fPrj.DefaultSdk,
460         }
461         return pPrj
462 }
463
464 /***
465 ** Private functions
466 ***/
467
468 // Create HTTP client
469 func (xs *XdsServer) _CreateConnectHTTP() error {
470         var err error
471         xs.client, err = common.HTTPNewClient(xs.BaseURL,
472                 common.HTTPClientConfig{
473                         URLPrefix:           "/api/v1",
474                         HeaderClientKeyName: "Xds-Sid",
475                         CsrfDisable:         true,
476                         LogOut:              xs.logOut,
477                         LogPrefix:           "XDSSERVER: ",
478                         LogLevel:            common.HTTPLogLevelWarning,
479                 })
480
481         xs.client.SetLogLevel(xs.Log.Level.String())
482
483         if err != nil {
484                 msg := ": " + err.Error()
485                 if strings.Contains(err.Error(), "connection refused") {
486                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
487                 }
488                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
489         }
490         if xs.client == nil {
491                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
492         }
493
494         return nil
495 }
496
497 // _HTTPGet .
498 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
499         var dd []byte
500         if err := xs.client.HTTPGet(url, &dd); err != nil {
501                 return err
502         }
503         return json.Unmarshal(dd, &data)
504 }
505
506 // _HTTPPost .
507 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
508         body, err := json.Marshal(data)
509         if err != nil {
510                 return nil, err
511         }
512         return xs.client.HTTPPostWithRes(url, string(body))
513 }
514
515 //  Re-established connection
516 func (xs *XdsServer) _reconnect() error {
517         err := xs._connect(true)
518         if err == nil {
519                 // Reload projects list for this server
520                 err = xs.projects.Init(xs)
521         }
522         return err
523 }
524
525 //  Established HTTP and WS connection and retrieve XDSServer config
526 func (xs *XdsServer) _connect(reConn bool) error {
527
528         xdsCfg := XdsServerConfig{}
529         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
530                 xs.Connected = false
531                 if !reConn {
532                         xs._NotifyState()
533                 }
534                 return err
535         }
536
537         if reConn && xs.ID != xdsCfg.ID {
538                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
539         }
540
541         // Update local XDS config
542         xs.ID = xdsCfg.ID
543         xs.ServerConfig = &xdsCfg
544
545         // Establish WS connection and register listen
546         if err := xs._SocketConnect(); err != nil {
547                 xs.Connected = false
548                 xs._NotifyState()
549                 return err
550         }
551
552         xs.Connected = true
553         xs._NotifyState()
554         return nil
555 }
556
557 // Create WebSocket (io.socket) connection
558 func (xs *XdsServer) _SocketConnect() error {
559
560         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
561
562         opts := &sio_client.Options{
563                 Transport: "websocket",
564                 Header:    make(map[string][]string),
565         }
566         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
567
568         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
569         if err != nil {
570                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
571         }
572         xs.ioSock = iosk
573
574         // Register some listeners
575
576         iosk.On("error", func(err error) {
577                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
578                 if xs.CBOnError != nil {
579                         xs.CBOnError(err)
580                 }
581         })
582
583         iosk.On("disconnection", func(err error) {
584                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
585                 if xs.CBOnDisconnect != nil {
586                         xs.CBOnDisconnect(err)
587                 }
588                 xs.Connected = false
589                 xs._NotifyState()
590
591                 // Try to reconnect during 15min (or at least while not disabled)
592                 go func() {
593                         count := 0
594                         waitTime := 1
595                         for !xs.Disabled && !xs.Connected {
596                                 count++
597                                 if count%60 == 0 {
598                                         waitTime *= 5
599                                 }
600                                 if waitTime > 15*60 {
601                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
602                                         return
603                                 }
604                                 time.Sleep(time.Second * time.Duration(waitTime))
605                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
606
607                                 xs._reconnect()
608                         }
609                 }()
610         })
611
612         // XXX - There is no connection event generated so, just consider that
613         // we are connected when NewClient return successfully
614         /* iosk.On("connection", func() { ... }) */
615         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
616
617         return nil
618 }
619
620 // Send event to notify changes
621 func (xs *XdsServer) _NotifyState() {
622
623         evSts := apiv1.ServerCfg{
624                 ID:         xs.ID,
625                 URL:        xs.BaseURL,
626                 APIURL:     xs.APIURL,
627                 PartialURL: xs.PartialURL,
628                 ConnRetry:  xs.ConnRetry,
629                 Connected:  xs.Connected,
630         }
631         if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
632                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
633         }
634 }