b76908c0337dc8e6a67e9bff9ba7e4b0dea3d989
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/xdsconfig"
15         common "github.com/iotbzh/xds-common/golib"
16         uuid "github.com/satori/go.uuid"
17         sio_client "github.com/sebd71/go-socket.io-client"
18 )
19
20 // XdsServer .
21 type XdsServer struct {
22         *Context
23         ID           string
24         BaseURL      string
25         APIURL       string
26         PartialURL   string
27         ConnRetry    int
28         Connected    bool
29         Disabled     bool
30         ServerConfig *XdsServerConfig
31
32         // Events management
33         CBOnError      func(error)
34         CBOnDisconnect func(error)
35         sockEvents     map[string][]*caller
36         sockEventsLock *sync.Mutex
37
38         // Private fields
39         client    *common.HTTPClient
40         ioSock    *sio_client.Client
41         logOut    io.Writer
42         apiRouter *gin.RouterGroup
43 }
44
45 // XdsServerConfig Data return by GET /config
46 type XdsServerConfig struct {
47         ID               string           `json:"id"`
48         Version          string           `json:"version"`
49         APIVersion       string           `json:"apiVersion"`
50         VersionGitTag    string           `json:"gitTag"`
51         SupportedSharing map[string]bool  `json:"supportedSharing"`
52         Builder          XdsBuilderConfig `json:"builder"`
53 }
54
55 // XdsBuilderConfig represents the builder container configuration
56 type XdsBuilderConfig struct {
57         IP          string `json:"ip"`
58         Port        string `json:"port"`
59         SyncThingID string `json:"syncThingID"`
60 }
61
62 // XdsFolderType XdsServer folder type
63 type XdsFolderType string
64
65 const (
66         XdsTypePathMap   = "PathMap"
67         XdsTypeCloudSync = "CloudSync"
68         XdsTypeCifsSmb   = "CIFS"
69 )
70
71 // XdsFolderConfig XdsServer folder config
72 type XdsFolderConfig struct {
73         ID         string        `json:"id"`
74         Label      string        `json:"label"`
75         ClientPath string        `json:"path"`
76         Type       XdsFolderType `json:"type"`
77         Status     string        `json:"status"`
78         IsInSync   bool          `json:"isInSync"`
79         DefaultSdk string        `json:"defaultSdk"`
80         // Specific data depending on which Type is used
81         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
82         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
83 }
84
85 // XdsPathMapConfig Path mapping specific data
86 type XdsPathMapConfig struct {
87         ServerPath   string `json:"serverPath"`
88         CheckFile    string `json:"checkFile"`
89         CheckContent string `json:"checkContent"`
90 }
91
92 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
93 type XdsCloudSyncConfig struct {
94         SyncThingID   string `json:"syncThingID"`
95         STSvrStatus   string `json:"-"`
96         STSvrIsInSync bool   `json:"-"`
97         STLocStatus   string `json:"-"`
98         STLocIsInSync bool   `json:"-"`
99 }
100
101 // XdsEventRegisterArgs arguments used to register to XDS server events
102 type XdsEventRegisterArgs struct {
103         Name      string `json:"name"`
104         ProjectID string `json:"filterProjectID"`
105 }
106
107 // XdsEventFolderChange Folder change event structure
108 type XdsEventFolderChange struct {
109         Time   string          `json:"time"`
110         Type   string          `json:"type"`
111         Folder XdsFolderConfig `json:"folder"`
112 }
113
114 // caller Used to chain event listeners
115 type caller struct {
116         id        uuid.UUID
117         EventName string
118         Func      func(interface{})
119 }
120
121 const _IDTempoPrefix = "tempo-"
122
123 // NewXdsServer creates an instance of XdsServer
124 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
125         return &XdsServer{
126                 Context:    ctx,
127                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
128                 BaseURL:    conf.URL,
129                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
130                 PartialURL: conf.APIPartialURL,
131                 ConnRetry:  conf.ConnRetry,
132                 Connected:  false,
133                 Disabled:   false,
134
135                 sockEvents:     make(map[string][]*caller),
136                 sockEventsLock: &sync.Mutex{},
137                 logOut:         ctx.Log.Out,
138         }
139 }
140
141 // Close Free and close XDS Server connection
142 func (xs *XdsServer) Close() error {
143         xs.Connected = false
144         xs.Disabled = true
145         xs.ioSock = nil
146         xs._NotifyState()
147         return nil
148 }
149
150 // Connect Establish HTTP connection with XDS Server
151 func (xs *XdsServer) Connect() error {
152         var err error
153         var retry int
154
155         xs.Disabled = false
156         xs.Connected = false
157
158         err = nil
159         for retry = xs.ConnRetry; retry > 0; retry-- {
160                 if err = xs._CreateConnectHTTP(); err == nil {
161                         break
162                 }
163                 if retry == xs.ConnRetry {
164                         // Notify only on the first conn error
165                         // doing that avoid 2 notifs (conn false; conn true) on startup
166                         xs._NotifyState()
167                 }
168                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
169                 time.Sleep(time.Second)
170         }
171         if retry == 0 {
172                 // FIXME: re-use _reconnect to wait longer in background
173                 return fmt.Errorf("Connection to XDS Server failure")
174         }
175         if err != nil {
176                 return err
177         }
178
179         // Check HTTP connection and establish WS connection
180         err = xs._connect(false)
181
182         return err
183 }
184
185 // IsTempoID returns true when server as a temporary id
186 func (xs *XdsServer) IsTempoID() bool {
187         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
188 }
189
190 // SetLoggerOutput Set logger ou
191 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
192         xs.logOut = out
193 }
194
195 // SendCommand Send a command to XDS Server
196 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
197         url := cmd
198         if !strings.HasPrefix("/", cmd) {
199                 url = "/" + cmd
200         }
201         return xs.client.HTTPPostWithRes(url, string(body))
202 }
203
204 // GetVersion Send Get request to retrieve XDS Server version
205 func (xs *XdsServer) GetVersion(res interface{}) error {
206         return xs._HTTPGet("/version", &res)
207 }
208
209 // GetFolders Send GET request to get current folder configuration
210 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
211         return xs._HTTPGet("/folders", folders)
212 }
213
214 // FolderAdd Send POST request to add a folder
215 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
216         response, err := xs._HTTPPost("/folder", fld)
217         if err != nil {
218                 return err
219         }
220         if response.StatusCode != 200 {
221                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
222         }
223         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
224         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
225
226         return err
227 }
228
229 // FolderDelete Send DELETE request to delete a folder
230 func (xs *XdsServer) FolderDelete(id string) error {
231         return xs.client.HTTPDelete("/folder/" + id)
232 }
233
234 // FolderSync Send POST request to force synchronization of a folder
235 func (xs *XdsServer) FolderSync(id string) error {
236         return xs.client.HTTPPost("/folder/sync/"+id, "")
237 }
238
239 // SetAPIRouterGroup .
240 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
241         xs.apiRouter = r
242 }
243
244 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
245 func (xs *XdsServer) PassthroughGet(url string) {
246         if xs.apiRouter == nil {
247                 xs.Log.Errorf("apiRouter not set !")
248                 return
249         }
250
251         xs.apiRouter.GET(url, func(c *gin.Context) {
252                 var data interface{}
253                 if err := xs._HTTPGet(url, &data); err != nil {
254                         if strings.Contains(err.Error(), "connection refused") {
255                                 xs.Connected = false
256                                 xs._NotifyState()
257                         }
258                         common.APIError(c, err.Error())
259                         return
260                 }
261
262                 c.JSON(http.StatusOK, data)
263         })
264 }
265
266 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
267 func (xs *XdsServer) PassthroughPost(url string) {
268         if xs.apiRouter == nil {
269                 xs.Log.Errorf("apiRouter not set !")
270                 return
271         }
272
273         xs.apiRouter.POST(url, func(c *gin.Context) {
274                 bodyReq := []byte{}
275                 n, err := c.Request.Body.Read(bodyReq)
276                 if err != nil {
277                         common.APIError(c, err.Error())
278                         return
279                 }
280
281                 response, err := xs._HTTPPost(url, bodyReq[:n])
282                 if err != nil {
283                         common.APIError(c, err.Error())
284                         return
285                 }
286                 bodyRes, err := ioutil.ReadAll(response.Body)
287                 if err != nil {
288                         common.APIError(c, "Cannot read response body")
289                         return
290                 }
291                 c.JSON(http.StatusOK, string(bodyRes))
292         })
293 }
294
295 // EventRegister Post a request to register to an XdsServer event
296 func (xs *XdsServer) EventRegister(evName string, id string) error {
297         var err error
298         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
299                 Name:      evName,
300                 ProjectID: id,
301         })
302         return err
303 }
304
305 // EventOn Register a callback on events reception
306 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
307         if xs.ioSock == nil {
308                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
309         }
310
311         xs.sockEventsLock.Lock()
312         defer xs.sockEventsLock.Unlock()
313
314         if _, exist := xs.sockEvents[evName]; !exist {
315                 // Register listener only the first time
316                 evn := evName
317
318                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
319                 var err error
320                 if evName == "event:FolderStateChanged" {
321                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
322                                 xs.sockEventsLock.Lock()
323                                 defer xs.sockEventsLock.Unlock()
324                                 for _, c := range xs.sockEvents[evn] {
325                                         c.Func(data)
326                                 }
327                                 return nil
328                         })
329                 } else {
330                         err = xs.ioSock.On(evn, f)
331                 }
332                 if err != nil {
333                         return uuid.Nil, err
334                 }
335         }
336
337         c := &caller{
338                 id:        uuid.NewV1(),
339                 EventName: evName,
340                 Func:      f,
341         }
342
343         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
344         return c.id, nil
345 }
346
347 // EventOff Un-register a (or all) callbacks associated to an event
348 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
349         xs.sockEventsLock.Lock()
350         defer xs.sockEventsLock.Unlock()
351         if _, exist := xs.sockEvents[evName]; exist {
352                 if id == uuid.Nil {
353                         // Un-register all
354                         xs.sockEvents[evName] = []*caller{}
355                 } else {
356                         // Un-register only the specified callback
357                         for i, ff := range xs.sockEvents[evName] {
358                                 if uuid.Equal(ff.id, id) {
359                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
360                                         break
361                                 }
362                         }
363                 }
364         }
365         return nil
366 }
367
368 // ProjectToFolder Convert Project structure to Folder structure
369 func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
370         stID := ""
371         if pPrj.Type == XdsTypeCloudSync {
372                 stID, _ = xs.SThg.IDGet()
373         }
374         fPrj := XdsFolderConfig{
375                 ID:         pPrj.ID,
376                 Label:      pPrj.Label,
377                 ClientPath: pPrj.ClientPath,
378                 Type:       XdsFolderType(pPrj.Type),
379                 Status:     pPrj.Status,
380                 IsInSync:   pPrj.IsInSync,
381                 DefaultSdk: pPrj.DefaultSdk,
382                 DataPathMap: XdsPathMapConfig{
383                         ServerPath: pPrj.ServerPath,
384                 },
385                 DataCloudSync: XdsCloudSyncConfig{
386                         SyncThingID:   stID,
387                         STLocIsInSync: pPrj.IsInSync,
388                         STLocStatus:   pPrj.Status,
389                         STSvrIsInSync: pPrj.IsInSync,
390                         STSvrStatus:   pPrj.Status,
391                 },
392         }
393
394         return &fPrj
395 }
396
397 // FolderToProject Convert Folder structure to Project structure
398 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig {
399         inSync := fPrj.IsInSync
400         sts := fPrj.Status
401
402         if fPrj.Type == XdsTypeCloudSync {
403                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
404
405                 sts = fPrj.DataCloudSync.STSvrStatus
406                 switch fPrj.DataCloudSync.STLocStatus {
407                 case StatusErrorConfig, StatusDisable, StatusPause:
408                         sts = fPrj.DataCloudSync.STLocStatus
409                         break
410                 case StatusSyncing:
411                         if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause {
412                                 sts = StatusSyncing
413                         }
414                         break
415                 case StatusEnable:
416                         // keep STSvrStatus
417                         break
418                 }
419         }
420
421         pPrj := ProjectConfig{
422                 ID:         fPrj.ID,
423                 ServerID:   xs.ID,
424                 Label:      fPrj.Label,
425                 ClientPath: fPrj.ClientPath,
426                 ServerPath: fPrj.DataPathMap.ServerPath,
427                 Type:       ProjectType(fPrj.Type),
428                 Status:     sts,
429                 IsInSync:   inSync,
430                 DefaultSdk: fPrj.DefaultSdk,
431         }
432         return pPrj
433 }
434
435 /***
436 ** Private functions
437 ***/
438
439 // Create HTTP client
440 func (xs *XdsServer) _CreateConnectHTTP() error {
441         var err error
442         xs.client, err = common.HTTPNewClient(xs.BaseURL,
443                 common.HTTPClientConfig{
444                         URLPrefix:           "/api/v1",
445                         HeaderClientKeyName: "Xds-Sid",
446                         CsrfDisable:         true,
447                         LogOut:              xs.logOut,
448                         LogPrefix:           "XDSSERVER: ",
449                         LogLevel:            common.HTTPLogLevelWarning,
450                 })
451
452         xs.client.SetLogLevel(xs.Log.Level.String())
453
454         if err != nil {
455                 msg := ": " + err.Error()
456                 if strings.Contains(err.Error(), "connection refused") {
457                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
458                 }
459                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
460         }
461         if xs.client == nil {
462                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
463         }
464
465         return nil
466 }
467
468 // _HTTPGet .
469 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
470         var dd []byte
471         if err := xs.client.HTTPGet(url, &dd); err != nil {
472                 return err
473         }
474         return json.Unmarshal(dd, &data)
475 }
476
477 // _HTTPPost .
478 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
479         body, err := json.Marshal(data)
480         if err != nil {
481                 return nil, err
482         }
483         return xs.client.HTTPPostWithRes(url, string(body))
484 }
485
486 //  Re-established connection
487 func (xs *XdsServer) _reconnect() error {
488         err := xs._connect(true)
489         if err == nil {
490                 // Reload projects list for this server
491                 err = xs.projects.Init(xs)
492         }
493         return err
494 }
495
496 //  Established HTTP and WS connection and retrieve XDSServer config
497 func (xs *XdsServer) _connect(reConn bool) error {
498
499         xdsCfg := XdsServerConfig{}
500         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
501                 xs.Connected = false
502                 if !reConn {
503                         xs._NotifyState()
504                 }
505                 return err
506         }
507
508         if reConn && xs.ID != xdsCfg.ID {
509                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
510         }
511
512         // Update local XDS config
513         xs.ID = xdsCfg.ID
514         xs.ServerConfig = &xdsCfg
515
516         // Establish WS connection and register listen
517         if err := xs._SocketConnect(); err != nil {
518                 xs.Connected = false
519                 xs._NotifyState()
520                 return err
521         }
522
523         xs.Connected = true
524         xs._NotifyState()
525         return nil
526 }
527
528 // Create WebSocket (io.socket) connection
529 func (xs *XdsServer) _SocketConnect() error {
530
531         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
532
533         opts := &sio_client.Options{
534                 Transport: "websocket",
535                 Header:    make(map[string][]string),
536         }
537         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
538
539         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
540         if err != nil {
541                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
542         }
543         xs.ioSock = iosk
544
545         // Register some listeners
546
547         iosk.On("error", func(err error) {
548                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
549                 if xs.CBOnError != nil {
550                         xs.CBOnError(err)
551                 }
552         })
553
554         iosk.On("disconnection", func(err error) {
555                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
556                 if xs.CBOnDisconnect != nil {
557                         xs.CBOnDisconnect(err)
558                 }
559                 xs.Connected = false
560                 xs._NotifyState()
561
562                 // Try to reconnect during 15min (or at least while not disabled)
563                 go func() {
564                         count := 0
565                         waitTime := 1
566                         for !xs.Disabled && !xs.Connected {
567                                 count++
568                                 if count%60 == 0 {
569                                         waitTime *= 5
570                                 }
571                                 if waitTime > 15*60 {
572                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
573                                         return
574                                 }
575                                 time.Sleep(time.Second * time.Duration(waitTime))
576                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
577
578                                 xs._reconnect()
579                         }
580                 }()
581         })
582
583         // XXX - There is no connection event generated so, just consider that
584         // we are connected when NewClient return successfully
585         /* iosk.On("connection", func() { ... }) */
586         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
587
588         return nil
589 }
590
591 // Send event to notify changes
592 func (xs *XdsServer) _NotifyState() {
593
594         evSts := ServerCfg{
595                 ID:         xs.ID,
596                 URL:        xs.BaseURL,
597                 APIURL:     xs.APIURL,
598                 PartialURL: xs.PartialURL,
599                 ConnRetry:  xs.ConnRetry,
600                 Connected:  xs.Connected,
601         }
602         if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
603                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
604         }
605 }