7b035798657420eb4591e32c47c1b9674d03a4e4
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/apiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         uuid "github.com/satori/go.uuid"
18         sio_client "github.com/sebd71/go-socket.io-client"
19 )
20
21 // XdsServer .
22 type XdsServer struct {
23         *Context
24         ID           string
25         BaseURL      string
26         APIURL       string
27         PartialURL   string
28         ConnRetry    int
29         Connected    bool
30         Disabled     bool
31         ServerConfig *XdsServerConfig
32
33         // Events management
34         CBOnError      func(error)
35         CBOnDisconnect func(error)
36         sockEvents     map[string][]*caller
37         sockEventsLock *sync.Mutex
38
39         // Private fields
40         client    *common.HTTPClient
41         ioSock    *sio_client.Client
42         logOut    io.Writer
43         apiRouter *gin.RouterGroup
44 }
45
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
48         ID               string           `json:"id"`
49         Version          string           `json:"version"`
50         APIVersion       string           `json:"apiVersion"`
51         VersionGitTag    string           `json:"gitTag"`
52         SupportedSharing map[string]bool  `json:"supportedSharing"`
53         Builder          XdsBuilderConfig `json:"builder"`
54 }
55
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
58         IP          string `json:"ip"`
59         Port        string `json:"port"`
60         SyncThingID string `json:"syncThingID"`
61 }
62
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
65
66 const (
67         // XdsTypePathMap Path Mapping folder type
68         XdsTypePathMap = "PathMap"
69         // XdsTypeCloudSync Cloud synchronization (AKA syncthing) folder type
70         XdsTypeCloudSync = "CloudSync"
71         // XdsTypeCifsSmb CIFS (AKA samba) folder type
72         XdsTypeCifsSmb = "CIFS"
73 )
74
75 // XdsFolderConfig XdsServer folder config
76 type XdsFolderConfig struct {
77         ID         string        `json:"id"`
78         Label      string        `json:"label"`
79         ClientPath string        `json:"path"`
80         Type       XdsFolderType `json:"type"`
81         Status     string        `json:"status"`
82         IsInSync   bool          `json:"isInSync"`
83         DefaultSdk string        `json:"defaultSdk"`
84         ClientData string        `json:"clientData"` // free form field that can used by client
85
86         // Specific data depending on which Type is used
87         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
88         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
89 }
90
91 // XdsPathMapConfig Path mapping specific data
92 type XdsPathMapConfig struct {
93         ServerPath   string `json:"serverPath"`
94         CheckFile    string `json:"checkFile"`
95         CheckContent string `json:"checkContent"`
96 }
97
98 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
99 type XdsCloudSyncConfig struct {
100         SyncThingID   string `json:"syncThingID"`
101         STSvrStatus   string `json:"-"`
102         STSvrIsInSync bool   `json:"-"`
103         STLocStatus   string `json:"-"`
104         STLocIsInSync bool   `json:"-"`
105 }
106
107 // XdsEventRegisterArgs arguments used to register to XDS server events
108 type XdsEventRegisterArgs struct {
109         Name      string `json:"name"`
110         ProjectID string `json:"filterProjectID"`
111 }
112
113 // XdsEventFolderChange Folder change event structure
114 type XdsEventFolderChange struct {
115         Time   string          `json:"time"`
116         Type   string          `json:"type"`
117         Folder XdsFolderConfig `json:"folder"`
118 }
119
120 // EventCB Event emitter callback
121 type EventCB func(privData interface{}, evtData interface{}) error
122
123 // caller Used to chain event listeners
124 type caller struct {
125         id          uuid.UUID
126         EventName   string
127         Func        EventCB
128         PrivateData interface{}
129 }
130
131 const _IDTempoPrefix = "tempo-"
132
133 // NewXdsServer creates an instance of XdsServer
134 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
135         return &XdsServer{
136                 Context:    ctx,
137                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
138                 BaseURL:    conf.URL,
139                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
140                 PartialURL: conf.APIPartialURL,
141                 ConnRetry:  conf.ConnRetry,
142                 Connected:  false,
143                 Disabled:   false,
144
145                 sockEvents:     make(map[string][]*caller),
146                 sockEventsLock: &sync.Mutex{},
147                 logOut:         ctx.Log.Out,
148         }
149 }
150
151 // Close Free and close XDS Server connection
152 func (xs *XdsServer) Close() error {
153         xs.Connected = false
154         xs.Disabled = true
155         xs.ioSock = nil
156         xs._NotifyState()
157         return nil
158 }
159
160 // Connect Establish HTTP connection with XDS Server
161 func (xs *XdsServer) Connect() error {
162         var err error
163         var retry int
164
165         xs.Disabled = false
166         xs.Connected = false
167
168         err = nil
169         for retry = xs.ConnRetry; retry > 0; retry-- {
170                 if err = xs._CreateConnectHTTP(); err == nil {
171                         break
172                 }
173                 if retry == xs.ConnRetry {
174                         // Notify only on the first conn error
175                         // doing that avoid 2 notifs (conn false; conn true) on startup
176                         xs._NotifyState()
177                 }
178                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
179                 time.Sleep(time.Second)
180         }
181         if retry == 0 {
182                 // FIXME: re-use _reconnect to wait longer in background
183                 return fmt.Errorf("Connection to XDS Server failure")
184         }
185         if err != nil {
186                 return err
187         }
188
189         // Check HTTP connection and establish WS connection
190         err = xs._connect(false)
191
192         return err
193 }
194
195 // IsTempoID returns true when server as a temporary id
196 func (xs *XdsServer) IsTempoID() bool {
197         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
198 }
199
200 // SetLoggerOutput Set logger ou
201 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
202         xs.logOut = out
203 }
204
205 // SendCommand Send a command to XDS Server
206 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
207         url := cmd
208         if !strings.HasPrefix("/", cmd) {
209                 url = "/" + cmd
210         }
211         return xs.client.HTTPPostWithRes(url, string(body))
212 }
213
214 // GetVersion Send Get request to retrieve XDS Server version
215 func (xs *XdsServer) GetVersion(res interface{}) error {
216         return xs._HTTPGet("/version", &res)
217 }
218
219 // GetFolders Send GET request to get current folder configuration
220 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
221         return xs._HTTPGet("/folders", folders)
222 }
223
224 // FolderAdd Send POST request to add a folder
225 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
226         response, err := xs._HTTPPost("/folders", fld)
227         if err != nil {
228                 return err
229         }
230         if response.StatusCode != 200 {
231                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
232         }
233         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
234         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
235
236         return err
237 }
238
239 // FolderDelete Send DELETE request to delete a folder
240 func (xs *XdsServer) FolderDelete(id string) error {
241         return xs.client.HTTPDelete("/folders/" + id)
242 }
243
244 // FolderSync Send POST request to force synchronization of a folder
245 func (xs *XdsServer) FolderSync(id string) error {
246         return xs.client.HTTPPost("/folders/sync/"+id, "")
247 }
248
249 // FolderUpdate Send PUT request to update a folder
250 func (xs *XdsServer) FolderUpdate(fld *XdsFolderConfig, resFld *XdsFolderConfig) error {
251         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
252 }
253
254 // SetAPIRouterGroup .
255 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
256         xs.apiRouter = r
257 }
258
259 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
260 func (xs *XdsServer) PassthroughGet(url string) {
261         if xs.apiRouter == nil {
262                 xs.Log.Errorf("apiRouter not set !")
263                 return
264         }
265
266         xs.apiRouter.GET(url, func(c *gin.Context) {
267                 var data interface{}
268                 // Take care of param (eg. id in /projects/:id)
269                 nURL := url
270                 if strings.Contains(url, ":") {
271                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
272                 }
273                 // Send Get request
274                 if err := xs._HTTPGet(nURL, &data); err != nil {
275                         if strings.Contains(err.Error(), "connection refused") {
276                                 xs.Connected = false
277                                 xs._NotifyState()
278                         }
279                         common.APIError(c, err.Error())
280                         return
281                 }
282
283                 c.JSON(http.StatusOK, data)
284         })
285 }
286
287 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
288 func (xs *XdsServer) PassthroughPost(url string) {
289         if xs.apiRouter == nil {
290                 xs.Log.Errorf("apiRouter not set !")
291                 return
292         }
293
294         xs.apiRouter.POST(url, func(c *gin.Context) {
295                 bodyReq := []byte{}
296                 n, err := c.Request.Body.Read(bodyReq)
297                 if err != nil {
298                         common.APIError(c, err.Error())
299                         return
300                 }
301
302                 // Take care of param (eg. id in /projects/:id)
303                 nURL := url
304                 if strings.Contains(url, ":") {
305                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
306                 }
307                 // Send Post request
308                 response, err := xs._HTTPPost(nURL, bodyReq[:n])
309                 if err != nil {
310                         common.APIError(c, err.Error())
311                         return
312                 }
313                 bodyRes, err := ioutil.ReadAll(response.Body)
314                 if err != nil {
315                         common.APIError(c, "Cannot read response body")
316                         return
317                 }
318                 c.JSON(http.StatusOK, string(bodyRes))
319         })
320 }
321
322 // EventRegister Post a request to register to an XdsServer event
323 func (xs *XdsServer) EventRegister(evName string, id string) error {
324         var err error
325         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
326                 Name:      evName,
327                 ProjectID: id,
328         })
329         return err
330 }
331
332 // EventOn Register a callback on events reception
333 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
334         if xs.ioSock == nil {
335                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
336         }
337
338         xs.sockEventsLock.Lock()
339         defer xs.sockEventsLock.Unlock()
340
341         if _, exist := xs.sockEvents[evName]; !exist {
342                 // Register listener only the first time
343                 evn := evName
344
345                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
346                 var err error
347                 if evName == "event:folder-state-change" {
348                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
349                                 xs.sockEventsLock.Lock()
350                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
351                                 copy(sEvts, xs.sockEvents[evn])
352                                 xs.sockEventsLock.Unlock()
353                                 for _, c := range sEvts {
354                                         c.Func(c.PrivateData, data)
355                                 }
356                                 return nil
357                         })
358                 } else {
359                         err = xs.ioSock.On(evn, func(data interface{}) error {
360                                 xs.sockEventsLock.Lock()
361                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
362                                 copy(sEvts, xs.sockEvents[evn])
363                                 xs.sockEventsLock.Unlock()
364                                 for _, c := range sEvts {
365                                         c.Func(c.PrivateData, data)
366                                 }
367                                 return nil
368                         })
369                 }
370                 if err != nil {
371                         return uuid.Nil, err
372                 }
373         }
374
375         c := &caller{
376                 id:          uuid.NewV1(),
377                 EventName:   evName,
378                 Func:        f,
379                 PrivateData: privData,
380         }
381
382         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
383         return c.id, nil
384 }
385
386 // EventOff Un-register a (or all) callbacks associated to an event
387 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
388         xs.sockEventsLock.Lock()
389         defer xs.sockEventsLock.Unlock()
390         if _, exist := xs.sockEvents[evName]; exist {
391                 if id == uuid.Nil {
392                         // Un-register all
393                         xs.sockEvents[evName] = []*caller{}
394                 } else {
395                         // Un-register only the specified callback
396                         for i, ff := range xs.sockEvents[evName] {
397                                 if uuid.Equal(ff.id, id) {
398                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
399                                         break
400                                 }
401                         }
402                 }
403         }
404         return nil
405 }
406
407 // ProjectToFolder Convert Project structure to Folder structure
408 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
409         stID := ""
410         if pPrj.Type == XdsTypeCloudSync {
411                 stID, _ = xs.SThg.IDGet()
412         }
413         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
414         fPrj := XdsFolderConfig{
415                 ID:         pPrj.ID,
416                 Label:      pPrj.Label,
417                 ClientPath: pPrj.ClientPath,
418                 Type:       XdsFolderType(pPrj.Type),
419                 Status:     pPrj.Status,
420                 IsInSync:   pPrj.IsInSync,
421                 DefaultSdk: pPrj.DefaultSdk,
422                 ClientData: pPrj.ClientData,
423                 DataPathMap: XdsPathMapConfig{
424                         ServerPath: pPrj.ServerPath,
425                 },
426                 DataCloudSync: XdsCloudSyncConfig{
427                         SyncThingID:   stID,
428                         STLocIsInSync: pPrj.IsInSync,
429                         STLocStatus:   pPrj.Status,
430                         STSvrIsInSync: pPrj.IsInSync,
431                         STSvrStatus:   pPrj.Status,
432                 },
433         }
434
435         return &fPrj
436 }
437
438 // FolderToProject Convert Folder structure to Project structure
439 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
440         inSync := fPrj.IsInSync
441         sts := fPrj.Status
442
443         if fPrj.Type == XdsTypeCloudSync {
444                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
445
446                 sts = fPrj.DataCloudSync.STSvrStatus
447                 switch fPrj.DataCloudSync.STLocStatus {
448                 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
449                         sts = fPrj.DataCloudSync.STLocStatus
450                         break
451                 case apiv1.StatusSyncing:
452                         if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
453                                 sts = apiv1.StatusSyncing
454                         }
455                         break
456                 case apiv1.StatusEnable:
457                         // keep STSvrStatus
458                         break
459                 }
460         }
461
462         pPrj := apiv1.ProjectConfig{
463                 ID:         fPrj.ID,
464                 ServerID:   xs.ID,
465                 Label:      fPrj.Label,
466                 ClientPath: fPrj.ClientPath,
467                 ServerPath: fPrj.DataPathMap.ServerPath,
468                 Type:       apiv1.ProjectType(fPrj.Type),
469                 Status:     sts,
470                 IsInSync:   inSync,
471                 DefaultSdk: fPrj.DefaultSdk,
472                 ClientData: fPrj.ClientData,
473         }
474         return pPrj
475 }
476
477 /***
478 ** Private functions
479 ***/
480
481 // Create HTTP client
482 func (xs *XdsServer) _CreateConnectHTTP() error {
483         var err error
484         xs.client, err = common.HTTPNewClient(xs.BaseURL,
485                 common.HTTPClientConfig{
486                         URLPrefix:           "/api/v1",
487                         HeaderClientKeyName: "Xds-Sid",
488                         CsrfDisable:         true,
489                         LogOut:              xs.logOut,
490                         LogPrefix:           "XDSSERVER: ",
491                         LogLevel:            common.HTTPLogLevelWarning,
492                 })
493
494         xs.client.SetLogLevel(xs.Log.Level.String())
495
496         if err != nil {
497                 msg := ": " + err.Error()
498                 if strings.Contains(err.Error(), "connection refused") {
499                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
500                 }
501                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
502         }
503         if xs.client == nil {
504                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
505         }
506
507         return nil
508 }
509
510 // _HTTPGet .
511 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
512         var dd []byte
513         if err := xs.client.HTTPGet(url, &dd); err != nil {
514                 return err
515         }
516         return json.Unmarshal(dd, &data)
517 }
518
519 // _HTTPPost .
520 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
521         body, err := json.Marshal(data)
522         if err != nil {
523                 return nil, err
524         }
525         return xs.client.HTTPPostWithRes(url, string(body))
526 }
527
528 //  Re-established connection
529 func (xs *XdsServer) _reconnect() error {
530         err := xs._connect(true)
531         if err == nil {
532                 // Reload projects list for this server
533                 err = xs.projects.Init(xs)
534         }
535         return err
536 }
537
538 //  Established HTTP and WS connection and retrieve XDSServer config
539 func (xs *XdsServer) _connect(reConn bool) error {
540
541         xdsCfg := XdsServerConfig{}
542         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
543                 xs.Connected = false
544                 if !reConn {
545                         xs._NotifyState()
546                 }
547                 return err
548         }
549
550         if reConn && xs.ID != xdsCfg.ID {
551                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
552         }
553
554         // Update local XDS config
555         xs.ID = xdsCfg.ID
556         xs.ServerConfig = &xdsCfg
557
558         // Establish WS connection and register listen
559         if err := xs._SocketConnect(); err != nil {
560                 xs.Connected = false
561                 xs._NotifyState()
562                 return err
563         }
564
565         xs.Connected = true
566         xs._NotifyState()
567         return nil
568 }
569
570 // Create WebSocket (io.socket) connection
571 func (xs *XdsServer) _SocketConnect() error {
572
573         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
574
575         opts := &sio_client.Options{
576                 Transport: "websocket",
577                 Header:    make(map[string][]string),
578         }
579         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
580
581         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
582         if err != nil {
583                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
584         }
585         xs.ioSock = iosk
586
587         // Register some listeners
588
589         iosk.On("error", func(err error) {
590                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
591                 if xs.CBOnError != nil {
592                         xs.CBOnError(err)
593                 }
594         })
595
596         iosk.On("disconnection", func(err error) {
597                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
598                 if xs.CBOnDisconnect != nil {
599                         xs.CBOnDisconnect(err)
600                 }
601                 xs.Connected = false
602                 xs._NotifyState()
603
604                 // Try to reconnect during 15min (or at least while not disabled)
605                 go func() {
606                         count := 0
607                         waitTime := 1
608                         for !xs.Disabled && !xs.Connected {
609                                 count++
610                                 if count%60 == 0 {
611                                         waitTime *= 5
612                                 }
613                                 if waitTime > 15*60 {
614                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
615                                         return
616                                 }
617                                 time.Sleep(time.Second * time.Duration(waitTime))
618                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
619
620                                 xs._reconnect()
621                         }
622                 }()
623         })
624
625         // XXX - There is no connection event generated so, just consider that
626         // we are connected when NewClient return successfully
627         /* iosk.On("connection", func() { ... }) */
628         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
629
630         return nil
631 }
632
633 // Send event to notify changes
634 func (xs *XdsServer) _NotifyState() {
635
636         evSts := apiv1.ServerCfg{
637                 ID:         xs.ID,
638                 URL:        xs.BaseURL,
639                 APIURL:     xs.APIURL,
640                 PartialURL: xs.PartialURL,
641                 ConnRetry:  xs.ConnRetry,
642                 Connected:  xs.Connected,
643         }
644         if err := xs.events.Emit(apiv1.EVTServerConfig, evSts, ""); err != nil {
645                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
646         }
647 }