Update api routes name (plural nouns)
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/apiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         uuid "github.com/satori/go.uuid"
18         sio_client "github.com/sebd71/go-socket.io-client"
19 )
20
21 // XdsServer .
22 type XdsServer struct {
23         *Context
24         ID           string
25         BaseURL      string
26         APIURL       string
27         PartialURL   string
28         ConnRetry    int
29         Connected    bool
30         Disabled     bool
31         ServerConfig *XdsServerConfig
32
33         // Events management
34         CBOnError      func(error)
35         CBOnDisconnect func(error)
36         sockEvents     map[string][]*caller
37         sockEventsLock *sync.Mutex
38
39         // Private fields
40         client    *common.HTTPClient
41         ioSock    *sio_client.Client
42         logOut    io.Writer
43         apiRouter *gin.RouterGroup
44 }
45
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
48         ID               string           `json:"id"`
49         Version          string           `json:"version"`
50         APIVersion       string           `json:"apiVersion"`
51         VersionGitTag    string           `json:"gitTag"`
52         SupportedSharing map[string]bool  `json:"supportedSharing"`
53         Builder          XdsBuilderConfig `json:"builder"`
54 }
55
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
58         IP          string `json:"ip"`
59         Port        string `json:"port"`
60         SyncThingID string `json:"syncThingID"`
61 }
62
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
65
66 const (
67         XdsTypePathMap   = "PathMap"
68         XdsTypeCloudSync = "CloudSync"
69         XdsTypeCifsSmb   = "CIFS"
70 )
71
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
74         ID         string        `json:"id"`
75         Label      string        `json:"label"`
76         ClientPath string        `json:"path"`
77         Type       XdsFolderType `json:"type"`
78         Status     string        `json:"status"`
79         IsInSync   bool          `json:"isInSync"`
80         DefaultSdk string        `json:"defaultSdk"`
81         // Specific data depending on which Type is used
82         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
83         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
84 }
85
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88         ServerPath   string `json:"serverPath"`
89         CheckFile    string `json:"checkFile"`
90         CheckContent string `json:"checkContent"`
91 }
92
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95         SyncThingID   string `json:"syncThingID"`
96         STSvrStatus   string `json:"-"`
97         STSvrIsInSync bool   `json:"-"`
98         STLocStatus   string `json:"-"`
99         STLocIsInSync bool   `json:"-"`
100 }
101
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104         Name      string `json:"name"`
105         ProjectID string `json:"filterProjectID"`
106 }
107
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110         Time   string          `json:"time"`
111         Type   string          `json:"type"`
112         Folder XdsFolderConfig `json:"folder"`
113 }
114
115 // caller Used to chain event listeners
116 type caller struct {
117         id        uuid.UUID
118         EventName string
119         Func      func(interface{})
120 }
121
122 const _IDTempoPrefix = "tempo-"
123
124 // NewXdsServer creates an instance of XdsServer
125 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
126         return &XdsServer{
127                 Context:    ctx,
128                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
129                 BaseURL:    conf.URL,
130                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
131                 PartialURL: conf.APIPartialURL,
132                 ConnRetry:  conf.ConnRetry,
133                 Connected:  false,
134                 Disabled:   false,
135
136                 sockEvents:     make(map[string][]*caller),
137                 sockEventsLock: &sync.Mutex{},
138                 logOut:         ctx.Log.Out,
139         }
140 }
141
142 // Close Free and close XDS Server connection
143 func (xs *XdsServer) Close() error {
144         xs.Connected = false
145         xs.Disabled = true
146         xs.ioSock = nil
147         xs._NotifyState()
148         return nil
149 }
150
151 // Connect Establish HTTP connection with XDS Server
152 func (xs *XdsServer) Connect() error {
153         var err error
154         var retry int
155
156         xs.Disabled = false
157         xs.Connected = false
158
159         err = nil
160         for retry = xs.ConnRetry; retry > 0; retry-- {
161                 if err = xs._CreateConnectHTTP(); err == nil {
162                         break
163                 }
164                 if retry == xs.ConnRetry {
165                         // Notify only on the first conn error
166                         // doing that avoid 2 notifs (conn false; conn true) on startup
167                         xs._NotifyState()
168                 }
169                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
170                 time.Sleep(time.Second)
171         }
172         if retry == 0 {
173                 // FIXME: re-use _reconnect to wait longer in background
174                 return fmt.Errorf("Connection to XDS Server failure")
175         }
176         if err != nil {
177                 return err
178         }
179
180         // Check HTTP connection and establish WS connection
181         err = xs._connect(false)
182
183         return err
184 }
185
186 // IsTempoID returns true when server as a temporary id
187 func (xs *XdsServer) IsTempoID() bool {
188         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
189 }
190
191 // SetLoggerOutput Set logger ou
192 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
193         xs.logOut = out
194 }
195
196 // SendCommand Send a command to XDS Server
197 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
198         url := cmd
199         if !strings.HasPrefix("/", cmd) {
200                 url = "/" + cmd
201         }
202         return xs.client.HTTPPostWithRes(url, string(body))
203 }
204
205 // GetVersion Send Get request to retrieve XDS Server version
206 func (xs *XdsServer) GetVersion(res interface{}) error {
207         return xs._HTTPGet("/version", &res)
208 }
209
210 // GetFolders Send GET request to get current folder configuration
211 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
212         return xs._HTTPGet("/folders", folders)
213 }
214
215 // FolderAdd Send POST request to add a folder
216 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
217         response, err := xs._HTTPPost("/folders", fld)
218         if err != nil {
219                 return err
220         }
221         if response.StatusCode != 200 {
222                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
223         }
224         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
225         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
226
227         return err
228 }
229
230 // FolderDelete Send DELETE request to delete a folder
231 func (xs *XdsServer) FolderDelete(id string) error {
232         return xs.client.HTTPDelete("/folders/" + id)
233 }
234
235 // FolderSync Send POST request to force synchronization of a folder
236 func (xs *XdsServer) FolderSync(id string) error {
237         return xs.client.HTTPPost("/folders/sync/"+id, "")
238 }
239
240 // SetAPIRouterGroup .
241 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
242         xs.apiRouter = r
243 }
244
245 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
246 func (xs *XdsServer) PassthroughGet(url string) {
247         if xs.apiRouter == nil {
248                 xs.Log.Errorf("apiRouter not set !")
249                 return
250         }
251
252         xs.apiRouter.GET(url, func(c *gin.Context) {
253                 var data interface{}
254                 // Take care of param (eg. id in /projects/:id)
255                 nURL := url
256                 if strings.Contains(url, ":") {
257                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
258                 }
259                 // Send Get request
260                 if err := xs._HTTPGet(nURL, &data); err != nil {
261                         if strings.Contains(err.Error(), "connection refused") {
262                                 xs.Connected = false
263                                 xs._NotifyState()
264                         }
265                         common.APIError(c, err.Error())
266                         return
267                 }
268
269                 c.JSON(http.StatusOK, data)
270         })
271 }
272
273 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
274 func (xs *XdsServer) PassthroughPost(url string) {
275         if xs.apiRouter == nil {
276                 xs.Log.Errorf("apiRouter not set !")
277                 return
278         }
279
280         xs.apiRouter.POST(url, func(c *gin.Context) {
281                 bodyReq := []byte{}
282                 n, err := c.Request.Body.Read(bodyReq)
283                 if err != nil {
284                         common.APIError(c, err.Error())
285                         return
286                 }
287
288                 // Take care of param (eg. id in /projects/:id)
289                 nURL := url
290                 if strings.Contains(url, ":") {
291                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
292                 }
293                 // Send Post request
294                 response, err := xs._HTTPPost(nURL, bodyReq[:n])
295                 if err != nil {
296                         common.APIError(c, err.Error())
297                         return
298                 }
299                 bodyRes, err := ioutil.ReadAll(response.Body)
300                 if err != nil {
301                         common.APIError(c, "Cannot read response body")
302                         return
303                 }
304                 c.JSON(http.StatusOK, string(bodyRes))
305         })
306 }
307
308 // EventRegister Post a request to register to an XdsServer event
309 func (xs *XdsServer) EventRegister(evName string, id string) error {
310         var err error
311         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
312                 Name:      evName,
313                 ProjectID: id,
314         })
315         return err
316 }
317
318 // EventOn Register a callback on events reception
319 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
320         if xs.ioSock == nil {
321                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
322         }
323
324         xs.sockEventsLock.Lock()
325         defer xs.sockEventsLock.Unlock()
326
327         if _, exist := xs.sockEvents[evName]; !exist {
328                 // Register listener only the first time
329                 evn := evName
330
331                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
332                 var err error
333                 if evName == "event:FolderStateChanged" {
334                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
335                                 xs.sockEventsLock.Lock()
336                                 defer xs.sockEventsLock.Unlock()
337                                 for _, c := range xs.sockEvents[evn] {
338                                         c.Func(data)
339                                 }
340                                 return nil
341                         })
342                 } else {
343                         err = xs.ioSock.On(evn, f)
344                 }
345                 if err != nil {
346                         return uuid.Nil, err
347                 }
348         }
349
350         c := &caller{
351                 id:        uuid.NewV1(),
352                 EventName: evName,
353                 Func:      f,
354         }
355
356         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
357         return c.id, nil
358 }
359
360 // EventOff Un-register a (or all) callbacks associated to an event
361 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
362         xs.sockEventsLock.Lock()
363         defer xs.sockEventsLock.Unlock()
364         if _, exist := xs.sockEvents[evName]; exist {
365                 if id == uuid.Nil {
366                         // Un-register all
367                         xs.sockEvents[evName] = []*caller{}
368                 } else {
369                         // Un-register only the specified callback
370                         for i, ff := range xs.sockEvents[evName] {
371                                 if uuid.Equal(ff.id, id) {
372                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
373                                         break
374                                 }
375                         }
376                 }
377         }
378         return nil
379 }
380
381 // ProjectToFolder Convert Project structure to Folder structure
382 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
383         stID := ""
384         if pPrj.Type == XdsTypeCloudSync {
385                 stID, _ = xs.SThg.IDGet()
386         }
387         fPrj := XdsFolderConfig{
388                 ID:         pPrj.ID,
389                 Label:      pPrj.Label,
390                 ClientPath: pPrj.ClientPath,
391                 Type:       XdsFolderType(pPrj.Type),
392                 Status:     pPrj.Status,
393                 IsInSync:   pPrj.IsInSync,
394                 DefaultSdk: pPrj.DefaultSdk,
395                 DataPathMap: XdsPathMapConfig{
396                         ServerPath: pPrj.ServerPath,
397                 },
398                 DataCloudSync: XdsCloudSyncConfig{
399                         SyncThingID:   stID,
400                         STLocIsInSync: pPrj.IsInSync,
401                         STLocStatus:   pPrj.Status,
402                         STSvrIsInSync: pPrj.IsInSync,
403                         STSvrStatus:   pPrj.Status,
404                 },
405         }
406
407         return &fPrj
408 }
409
410 // FolderToProject Convert Folder structure to Project structure
411 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
412         inSync := fPrj.IsInSync
413         sts := fPrj.Status
414
415         if fPrj.Type == XdsTypeCloudSync {
416                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
417
418                 sts = fPrj.DataCloudSync.STSvrStatus
419                 switch fPrj.DataCloudSync.STLocStatus {
420                 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
421                         sts = fPrj.DataCloudSync.STLocStatus
422                         break
423                 case apiv1.StatusSyncing:
424                         if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
425                                 sts = apiv1.StatusSyncing
426                         }
427                         break
428                 case apiv1.StatusEnable:
429                         // keep STSvrStatus
430                         break
431                 }
432         }
433
434         pPrj := apiv1.ProjectConfig{
435                 ID:         fPrj.ID,
436                 ServerID:   xs.ID,
437                 Label:      fPrj.Label,
438                 ClientPath: fPrj.ClientPath,
439                 ServerPath: fPrj.DataPathMap.ServerPath,
440                 Type:       apiv1.ProjectType(fPrj.Type),
441                 Status:     sts,
442                 IsInSync:   inSync,
443                 DefaultSdk: fPrj.DefaultSdk,
444         }
445         return pPrj
446 }
447
448 /***
449 ** Private functions
450 ***/
451
452 // Create HTTP client
453 func (xs *XdsServer) _CreateConnectHTTP() error {
454         var err error
455         xs.client, err = common.HTTPNewClient(xs.BaseURL,
456                 common.HTTPClientConfig{
457                         URLPrefix:           "/api/v1",
458                         HeaderClientKeyName: "Xds-Sid",
459                         CsrfDisable:         true,
460                         LogOut:              xs.logOut,
461                         LogPrefix:           "XDSSERVER: ",
462                         LogLevel:            common.HTTPLogLevelWarning,
463                 })
464
465         xs.client.SetLogLevel(xs.Log.Level.String())
466
467         if err != nil {
468                 msg := ": " + err.Error()
469                 if strings.Contains(err.Error(), "connection refused") {
470                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
471                 }
472                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
473         }
474         if xs.client == nil {
475                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
476         }
477
478         return nil
479 }
480
481 // _HTTPGet .
482 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
483         var dd []byte
484         if err := xs.client.HTTPGet(url, &dd); err != nil {
485                 return err
486         }
487         return json.Unmarshal(dd, &data)
488 }
489
490 // _HTTPPost .
491 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
492         body, err := json.Marshal(data)
493         if err != nil {
494                 return nil, err
495         }
496         return xs.client.HTTPPostWithRes(url, string(body))
497 }
498
499 //  Re-established connection
500 func (xs *XdsServer) _reconnect() error {
501         err := xs._connect(true)
502         if err == nil {
503                 // Reload projects list for this server
504                 err = xs.projects.Init(xs)
505         }
506         return err
507 }
508
509 //  Established HTTP and WS connection and retrieve XDSServer config
510 func (xs *XdsServer) _connect(reConn bool) error {
511
512         xdsCfg := XdsServerConfig{}
513         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
514                 xs.Connected = false
515                 if !reConn {
516                         xs._NotifyState()
517                 }
518                 return err
519         }
520
521         if reConn && xs.ID != xdsCfg.ID {
522                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
523         }
524
525         // Update local XDS config
526         xs.ID = xdsCfg.ID
527         xs.ServerConfig = &xdsCfg
528
529         // Establish WS connection and register listen
530         if err := xs._SocketConnect(); err != nil {
531                 xs.Connected = false
532                 xs._NotifyState()
533                 return err
534         }
535
536         xs.Connected = true
537         xs._NotifyState()
538         return nil
539 }
540
541 // Create WebSocket (io.socket) connection
542 func (xs *XdsServer) _SocketConnect() error {
543
544         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
545
546         opts := &sio_client.Options{
547                 Transport: "websocket",
548                 Header:    make(map[string][]string),
549         }
550         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
551
552         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
553         if err != nil {
554                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
555         }
556         xs.ioSock = iosk
557
558         // Register some listeners
559
560         iosk.On("error", func(err error) {
561                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
562                 if xs.CBOnError != nil {
563                         xs.CBOnError(err)
564                 }
565         })
566
567         iosk.On("disconnection", func(err error) {
568                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
569                 if xs.CBOnDisconnect != nil {
570                         xs.CBOnDisconnect(err)
571                 }
572                 xs.Connected = false
573                 xs._NotifyState()
574
575                 // Try to reconnect during 15min (or at least while not disabled)
576                 go func() {
577                         count := 0
578                         waitTime := 1
579                         for !xs.Disabled && !xs.Connected {
580                                 count++
581                                 if count%60 == 0 {
582                                         waitTime *= 5
583                                 }
584                                 if waitTime > 15*60 {
585                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
586                                         return
587                                 }
588                                 time.Sleep(time.Second * time.Duration(waitTime))
589                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
590
591                                 xs._reconnect()
592                         }
593                 }()
594         })
595
596         // XXX - There is no connection event generated so, just consider that
597         // we are connected when NewClient return successfully
598         /* iosk.On("connection", func() { ... }) */
599         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
600
601         return nil
602 }
603
604 // Send event to notify changes
605 func (xs *XdsServer) _NotifyState() {
606
607         evSts := apiv1.ServerCfg{
608                 ID:         xs.ID,
609                 URL:        xs.BaseURL,
610                 APIURL:     xs.APIURL,
611                 PartialURL: xs.PartialURL,
612                 ConnRetry:  xs.ConnRetry,
613                 Connected:  xs.Connected,
614         }
615         if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
616                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
617         }
618 }