Moved all structs exposed by API into apiv1 package
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/apiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         uuid "github.com/satori/go.uuid"
18         sio_client "github.com/sebd71/go-socket.io-client"
19 )
20
21 // XdsServer .
22 type XdsServer struct {
23         *Context
24         ID           string
25         BaseURL      string
26         APIURL       string
27         PartialURL   string
28         ConnRetry    int
29         Connected    bool
30         Disabled     bool
31         ServerConfig *XdsServerConfig
32
33         // Events management
34         CBOnError      func(error)
35         CBOnDisconnect func(error)
36         sockEvents     map[string][]*caller
37         sockEventsLock *sync.Mutex
38
39         // Private fields
40         client    *common.HTTPClient
41         ioSock    *sio_client.Client
42         logOut    io.Writer
43         apiRouter *gin.RouterGroup
44 }
45
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
48         ID               string           `json:"id"`
49         Version          string           `json:"version"`
50         APIVersion       string           `json:"apiVersion"`
51         VersionGitTag    string           `json:"gitTag"`
52         SupportedSharing map[string]bool  `json:"supportedSharing"`
53         Builder          XdsBuilderConfig `json:"builder"`
54 }
55
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
58         IP          string `json:"ip"`
59         Port        string `json:"port"`
60         SyncThingID string `json:"syncThingID"`
61 }
62
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
65
66 const (
67         XdsTypePathMap   = "PathMap"
68         XdsTypeCloudSync = "CloudSync"
69         XdsTypeCifsSmb   = "CIFS"
70 )
71
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
74         ID         string        `json:"id"`
75         Label      string        `json:"label"`
76         ClientPath string        `json:"path"`
77         Type       XdsFolderType `json:"type"`
78         Status     string        `json:"status"`
79         IsInSync   bool          `json:"isInSync"`
80         DefaultSdk string        `json:"defaultSdk"`
81         // Specific data depending on which Type is used
82         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
83         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
84 }
85
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88         ServerPath   string `json:"serverPath"`
89         CheckFile    string `json:"checkFile"`
90         CheckContent string `json:"checkContent"`
91 }
92
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95         SyncThingID   string `json:"syncThingID"`
96         STSvrStatus   string `json:"-"`
97         STSvrIsInSync bool   `json:"-"`
98         STLocStatus   string `json:"-"`
99         STLocIsInSync bool   `json:"-"`
100 }
101
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104         Name      string `json:"name"`
105         ProjectID string `json:"filterProjectID"`
106 }
107
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110         Time   string          `json:"time"`
111         Type   string          `json:"type"`
112         Folder XdsFolderConfig `json:"folder"`
113 }
114
115 // caller Used to chain event listeners
116 type caller struct {
117         id        uuid.UUID
118         EventName string
119         Func      func(interface{})
120 }
121
122 const _IDTempoPrefix = "tempo-"
123
124 // NewXdsServer creates an instance of XdsServer
125 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
126         return &XdsServer{
127                 Context:    ctx,
128                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
129                 BaseURL:    conf.URL,
130                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
131                 PartialURL: conf.APIPartialURL,
132                 ConnRetry:  conf.ConnRetry,
133                 Connected:  false,
134                 Disabled:   false,
135
136                 sockEvents:     make(map[string][]*caller),
137                 sockEventsLock: &sync.Mutex{},
138                 logOut:         ctx.Log.Out,
139         }
140 }
141
142 // Close Free and close XDS Server connection
143 func (xs *XdsServer) Close() error {
144         xs.Connected = false
145         xs.Disabled = true
146         xs.ioSock = nil
147         xs._NotifyState()
148         return nil
149 }
150
151 // Connect Establish HTTP connection with XDS Server
152 func (xs *XdsServer) Connect() error {
153         var err error
154         var retry int
155
156         xs.Disabled = false
157         xs.Connected = false
158
159         err = nil
160         for retry = xs.ConnRetry; retry > 0; retry-- {
161                 if err = xs._CreateConnectHTTP(); err == nil {
162                         break
163                 }
164                 if retry == xs.ConnRetry {
165                         // Notify only on the first conn error
166                         // doing that avoid 2 notifs (conn false; conn true) on startup
167                         xs._NotifyState()
168                 }
169                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
170                 time.Sleep(time.Second)
171         }
172         if retry == 0 {
173                 // FIXME: re-use _reconnect to wait longer in background
174                 return fmt.Errorf("Connection to XDS Server failure")
175         }
176         if err != nil {
177                 return err
178         }
179
180         // Check HTTP connection and establish WS connection
181         err = xs._connect(false)
182
183         return err
184 }
185
186 // IsTempoID returns true when server as a temporary id
187 func (xs *XdsServer) IsTempoID() bool {
188         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
189 }
190
191 // SetLoggerOutput Set logger ou
192 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
193         xs.logOut = out
194 }
195
196 // SendCommand Send a command to XDS Server
197 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
198         url := cmd
199         if !strings.HasPrefix("/", cmd) {
200                 url = "/" + cmd
201         }
202         return xs.client.HTTPPostWithRes(url, string(body))
203 }
204
205 // GetVersion Send Get request to retrieve XDS Server version
206 func (xs *XdsServer) GetVersion(res interface{}) error {
207         return xs._HTTPGet("/version", &res)
208 }
209
210 // GetFolders Send GET request to get current folder configuration
211 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
212         return xs._HTTPGet("/folders", folders)
213 }
214
215 // FolderAdd Send POST request to add a folder
216 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
217         response, err := xs._HTTPPost("/folder", fld)
218         if err != nil {
219                 return err
220         }
221         if response.StatusCode != 200 {
222                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
223         }
224         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
225         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
226
227         return err
228 }
229
230 // FolderDelete Send DELETE request to delete a folder
231 func (xs *XdsServer) FolderDelete(id string) error {
232         return xs.client.HTTPDelete("/folder/" + id)
233 }
234
235 // FolderSync Send POST request to force synchronization of a folder
236 func (xs *XdsServer) FolderSync(id string) error {
237         return xs.client.HTTPPost("/folder/sync/"+id, "")
238 }
239
240 // SetAPIRouterGroup .
241 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
242         xs.apiRouter = r
243 }
244
245 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
246 func (xs *XdsServer) PassthroughGet(url string) {
247         if xs.apiRouter == nil {
248                 xs.Log.Errorf("apiRouter not set !")
249                 return
250         }
251
252         xs.apiRouter.GET(url, func(c *gin.Context) {
253                 var data interface{}
254                 if err := xs._HTTPGet(url, &data); err != nil {
255                         if strings.Contains(err.Error(), "connection refused") {
256                                 xs.Connected = false
257                                 xs._NotifyState()
258                         }
259                         common.APIError(c, err.Error())
260                         return
261                 }
262
263                 c.JSON(http.StatusOK, data)
264         })
265 }
266
267 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
268 func (xs *XdsServer) PassthroughPost(url string) {
269         if xs.apiRouter == nil {
270                 xs.Log.Errorf("apiRouter not set !")
271                 return
272         }
273
274         xs.apiRouter.POST(url, func(c *gin.Context) {
275                 bodyReq := []byte{}
276                 n, err := c.Request.Body.Read(bodyReq)
277                 if err != nil {
278                         common.APIError(c, err.Error())
279                         return
280                 }
281
282                 response, err := xs._HTTPPost(url, bodyReq[:n])
283                 if err != nil {
284                         common.APIError(c, err.Error())
285                         return
286                 }
287                 bodyRes, err := ioutil.ReadAll(response.Body)
288                 if err != nil {
289                         common.APIError(c, "Cannot read response body")
290                         return
291                 }
292                 c.JSON(http.StatusOK, string(bodyRes))
293         })
294 }
295
296 // EventRegister Post a request to register to an XdsServer event
297 func (xs *XdsServer) EventRegister(evName string, id string) error {
298         var err error
299         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
300                 Name:      evName,
301                 ProjectID: id,
302         })
303         return err
304 }
305
306 // EventOn Register a callback on events reception
307 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
308         if xs.ioSock == nil {
309                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
310         }
311
312         xs.sockEventsLock.Lock()
313         defer xs.sockEventsLock.Unlock()
314
315         if _, exist := xs.sockEvents[evName]; !exist {
316                 // Register listener only the first time
317                 evn := evName
318
319                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
320                 var err error
321                 if evName == "event:FolderStateChanged" {
322                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
323                                 xs.sockEventsLock.Lock()
324                                 defer xs.sockEventsLock.Unlock()
325                                 for _, c := range xs.sockEvents[evn] {
326                                         c.Func(data)
327                                 }
328                                 return nil
329                         })
330                 } else {
331                         err = xs.ioSock.On(evn, f)
332                 }
333                 if err != nil {
334                         return uuid.Nil, err
335                 }
336         }
337
338         c := &caller{
339                 id:        uuid.NewV1(),
340                 EventName: evName,
341                 Func:      f,
342         }
343
344         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
345         return c.id, nil
346 }
347
348 // EventOff Un-register a (or all) callbacks associated to an event
349 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
350         xs.sockEventsLock.Lock()
351         defer xs.sockEventsLock.Unlock()
352         if _, exist := xs.sockEvents[evName]; exist {
353                 if id == uuid.Nil {
354                         // Un-register all
355                         xs.sockEvents[evName] = []*caller{}
356                 } else {
357                         // Un-register only the specified callback
358                         for i, ff := range xs.sockEvents[evName] {
359                                 if uuid.Equal(ff.id, id) {
360                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
361                                         break
362                                 }
363                         }
364                 }
365         }
366         return nil
367 }
368
369 // ProjectToFolder Convert Project structure to Folder structure
370 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
371         stID := ""
372         if pPrj.Type == XdsTypeCloudSync {
373                 stID, _ = xs.SThg.IDGet()
374         }
375         fPrj := XdsFolderConfig{
376                 ID:         pPrj.ID,
377                 Label:      pPrj.Label,
378                 ClientPath: pPrj.ClientPath,
379                 Type:       XdsFolderType(pPrj.Type),
380                 Status:     pPrj.Status,
381                 IsInSync:   pPrj.IsInSync,
382                 DefaultSdk: pPrj.DefaultSdk,
383                 DataPathMap: XdsPathMapConfig{
384                         ServerPath: pPrj.ServerPath,
385                 },
386                 DataCloudSync: XdsCloudSyncConfig{
387                         SyncThingID:   stID,
388                         STLocIsInSync: pPrj.IsInSync,
389                         STLocStatus:   pPrj.Status,
390                         STSvrIsInSync: pPrj.IsInSync,
391                         STSvrStatus:   pPrj.Status,
392                 },
393         }
394
395         return &fPrj
396 }
397
398 // FolderToProject Convert Folder structure to Project structure
399 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
400         inSync := fPrj.IsInSync
401         sts := fPrj.Status
402
403         if fPrj.Type == XdsTypeCloudSync {
404                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
405
406                 sts = fPrj.DataCloudSync.STSvrStatus
407                 switch fPrj.DataCloudSync.STLocStatus {
408                 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
409                         sts = fPrj.DataCloudSync.STLocStatus
410                         break
411                 case apiv1.StatusSyncing:
412                         if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
413                                 sts = apiv1.StatusSyncing
414                         }
415                         break
416                 case apiv1.StatusEnable:
417                         // keep STSvrStatus
418                         break
419                 }
420         }
421
422         pPrj := apiv1.ProjectConfig{
423                 ID:         fPrj.ID,
424                 ServerID:   xs.ID,
425                 Label:      fPrj.Label,
426                 ClientPath: fPrj.ClientPath,
427                 ServerPath: fPrj.DataPathMap.ServerPath,
428                 Type:       apiv1.ProjectType(fPrj.Type),
429                 Status:     sts,
430                 IsInSync:   inSync,
431                 DefaultSdk: fPrj.DefaultSdk,
432         }
433         return pPrj
434 }
435
436 /***
437 ** Private functions
438 ***/
439
440 // Create HTTP client
441 func (xs *XdsServer) _CreateConnectHTTP() error {
442         var err error
443         xs.client, err = common.HTTPNewClient(xs.BaseURL,
444                 common.HTTPClientConfig{
445                         URLPrefix:           "/api/v1",
446                         HeaderClientKeyName: "Xds-Sid",
447                         CsrfDisable:         true,
448                         LogOut:              xs.logOut,
449                         LogPrefix:           "XDSSERVER: ",
450                         LogLevel:            common.HTTPLogLevelWarning,
451                 })
452
453         xs.client.SetLogLevel(xs.Log.Level.String())
454
455         if err != nil {
456                 msg := ": " + err.Error()
457                 if strings.Contains(err.Error(), "connection refused") {
458                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
459                 }
460                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
461         }
462         if xs.client == nil {
463                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
464         }
465
466         return nil
467 }
468
469 // _HTTPGet .
470 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
471         var dd []byte
472         if err := xs.client.HTTPGet(url, &dd); err != nil {
473                 return err
474         }
475         return json.Unmarshal(dd, &data)
476 }
477
478 // _HTTPPost .
479 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
480         body, err := json.Marshal(data)
481         if err != nil {
482                 return nil, err
483         }
484         return xs.client.HTTPPostWithRes(url, string(body))
485 }
486
487 //  Re-established connection
488 func (xs *XdsServer) _reconnect() error {
489         err := xs._connect(true)
490         if err == nil {
491                 // Reload projects list for this server
492                 err = xs.projects.Init(xs)
493         }
494         return err
495 }
496
497 //  Established HTTP and WS connection and retrieve XDSServer config
498 func (xs *XdsServer) _connect(reConn bool) error {
499
500         xdsCfg := XdsServerConfig{}
501         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
502                 xs.Connected = false
503                 if !reConn {
504                         xs._NotifyState()
505                 }
506                 return err
507         }
508
509         if reConn && xs.ID != xdsCfg.ID {
510                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
511         }
512
513         // Update local XDS config
514         xs.ID = xdsCfg.ID
515         xs.ServerConfig = &xdsCfg
516
517         // Establish WS connection and register listen
518         if err := xs._SocketConnect(); err != nil {
519                 xs.Connected = false
520                 xs._NotifyState()
521                 return err
522         }
523
524         xs.Connected = true
525         xs._NotifyState()
526         return nil
527 }
528
529 // Create WebSocket (io.socket) connection
530 func (xs *XdsServer) _SocketConnect() error {
531
532         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
533
534         opts := &sio_client.Options{
535                 Transport: "websocket",
536                 Header:    make(map[string][]string),
537         }
538         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
539
540         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
541         if err != nil {
542                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
543         }
544         xs.ioSock = iosk
545
546         // Register some listeners
547
548         iosk.On("error", func(err error) {
549                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
550                 if xs.CBOnError != nil {
551                         xs.CBOnError(err)
552                 }
553         })
554
555         iosk.On("disconnection", func(err error) {
556                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
557                 if xs.CBOnDisconnect != nil {
558                         xs.CBOnDisconnect(err)
559                 }
560                 xs.Connected = false
561                 xs._NotifyState()
562
563                 // Try to reconnect during 15min (or at least while not disabled)
564                 go func() {
565                         count := 0
566                         waitTime := 1
567                         for !xs.Disabled && !xs.Connected {
568                                 count++
569                                 if count%60 == 0 {
570                                         waitTime *= 5
571                                 }
572                                 if waitTime > 15*60 {
573                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
574                                         return
575                                 }
576                                 time.Sleep(time.Second * time.Duration(waitTime))
577                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
578
579                                 xs._reconnect()
580                         }
581                 }()
582         })
583
584         // XXX - There is no connection event generated so, just consider that
585         // we are connected when NewClient return successfully
586         /* iosk.On("connection", func() { ... }) */
587         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
588
589         return nil
590 }
591
592 // Send event to notify changes
593 func (xs *XdsServer) _NotifyState() {
594
595         evSts := apiv1.ServerCfg{
596                 ID:         xs.ID,
597                 URL:        xs.BaseURL,
598                 APIURL:     xs.APIURL,
599                 PartialURL: xs.PartialURL,
600                 ConnRetry:  xs.ConnRetry,
601                 Connected:  xs.Connected,
602         }
603         if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
604                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
605         }
606 }