Fixed Syncthing folder status events and exec command.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/xdsconfig"
15         common "github.com/iotbzh/xds-common/golib"
16         uuid "github.com/satori/go.uuid"
17         sio_client "github.com/sebd71/go-socket.io-client"
18 )
19
20 // XdsServer .
21 type XdsServer struct {
22         *Context
23         ID           string
24         BaseURL      string
25         APIURL       string
26         PartialURL   string
27         ConnRetry    int
28         Connected    bool
29         Disabled     bool
30         ServerConfig *XdsServerConfig
31
32         // Events management
33         CBOnError      func(error)
34         CBOnDisconnect func(error)
35         sockEvents     map[string][]*caller
36         sockEventsLock *sync.Mutex
37
38         // Private fields
39         client    *common.HTTPClient
40         ioSock    *sio_client.Client
41         logOut    io.Writer
42         apiRouter *gin.RouterGroup
43 }
44
45 // XdsServerConfig Data return by GET /config
46 type XdsServerConfig struct {
47         ID               string           `json:"id"`
48         Version          string           `json:"version"`
49         APIVersion       string           `json:"apiVersion"`
50         VersionGitTag    string           `json:"gitTag"`
51         SupportedSharing map[string]bool  `json:"supportedSharing"`
52         Builder          XdsBuilderConfig `json:"builder"`
53 }
54
55 // XdsBuilderConfig represents the builder container configuration
56 type XdsBuilderConfig struct {
57         IP          string `json:"ip"`
58         Port        string `json:"port"`
59         SyncThingID string `json:"syncThingID"`
60 }
61
62 // XdsFolderType XdsServer folder type
63 type XdsFolderType string
64
65 const (
66         XdsTypePathMap   = "PathMap"
67         XdsTypeCloudSync = "CloudSync"
68         XdsTypeCifsSmb   = "CIFS"
69 )
70
71 // XdsFolderConfig XdsServer folder config
72 type XdsFolderConfig struct {
73         ID         string        `json:"id"`
74         Label      string        `json:"label"`
75         ClientPath string        `json:"path"`
76         Type       XdsFolderType `json:"type"`
77         Status     string        `json:"status"`
78         IsInSync   bool          `json:"isInSync"`
79         DefaultSdk string        `json:"defaultSdk"`
80         // Specific data depending on which Type is used
81         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
82         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
83 }
84
85 // XdsPathMapConfig Path mapping specific data
86 type XdsPathMapConfig struct {
87         ServerPath string `json:"serverPath"`
88 }
89
90 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
91 type XdsCloudSyncConfig struct {
92         SyncThingID   string `json:"syncThingID"`
93         STSvrStatus   string `json:"-"`
94         STSvrIsInSync bool   `json:"-"`
95         STLocStatus   string `json:"-"`
96         STLocIsInSync bool   `json:"-"`
97 }
98
99 // XdsEventRegisterArgs arguments used to register to XDS server events
100 type XdsEventRegisterArgs struct {
101         Name      string `json:"name"`
102         ProjectID string `json:"filterProjectID"`
103 }
104
105 // XdsEventFolderChange Folder change event structure
106 type XdsEventFolderChange struct {
107         Time   string          `json:"time"`
108         Type   string          `json:"type"`
109         Folder XdsFolderConfig `json:"folder"`
110 }
111
112 // caller Used to chain event listeners
113 type caller struct {
114         id        uuid.UUID
115         EventName string
116         Func      func(interface{})
117 }
118
119 const _IDTempoPrefix = "tempo-"
120
121 // NewXdsServer creates an instance of XdsServer
122 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
123         return &XdsServer{
124                 Context:    ctx,
125                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
126                 BaseURL:    conf.URL,
127                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
128                 PartialURL: conf.APIPartialURL,
129                 ConnRetry:  conf.ConnRetry,
130                 Connected:  false,
131                 Disabled:   false,
132
133                 sockEvents:     make(map[string][]*caller),
134                 sockEventsLock: &sync.Mutex{},
135                 logOut:         ctx.Log.Out,
136         }
137 }
138
139 // Close Free and close XDS Server connection
140 func (xs *XdsServer) Close() error {
141         xs.Connected = false
142         xs.Disabled = true
143         xs.ioSock = nil
144         xs._NotifyState()
145         return nil
146 }
147
148 // Connect Establish HTTP connection with XDS Server
149 func (xs *XdsServer) Connect() error {
150         var err error
151         var retry int
152
153         xs.Disabled = false
154         xs.Connected = false
155
156         err = nil
157         for retry = xs.ConnRetry; retry > 0; retry-- {
158                 if err = xs._CreateConnectHTTP(); err == nil {
159                         break
160                 }
161                 if retry == xs.ConnRetry {
162                         // Notify only on the first conn error
163                         // doing that avoid 2 notifs (conn false; conn true) on startup
164                         xs._NotifyState()
165                 }
166                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
167                 time.Sleep(time.Second)
168         }
169         if retry == 0 {
170                 // FIXME: re-use _reconnect to wait longer in background
171                 return fmt.Errorf("Connection to XDS Server failure")
172         }
173         if err != nil {
174                 return err
175         }
176
177         // Check HTTP connection and establish WS connection
178         err = xs._connect(false)
179
180         return err
181 }
182
183 // IsTempoID returns true when server as a temporary id
184 func (xs *XdsServer) IsTempoID() bool {
185         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
186 }
187
188 // SetLoggerOutput Set logger ou
189 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
190         xs.logOut = out
191 }
192
193 // SendCommand Send a command to XDS Server
194 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
195         url := cmd
196         if !strings.HasPrefix("/", cmd) {
197                 url = "/" + cmd
198         }
199         return xs.client.HTTPPostWithRes(url, string(body))
200 }
201
202 // GetVersion Send Get request to retrieve XDS Server version
203 func (xs *XdsServer) GetVersion(res interface{}) error {
204         return xs._HTTPGet("/version", &res)
205 }
206
207 // GetFolders Send GET request to get current folder configuration
208 func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error {
209         return xs._HTTPGet("/folders", prjs)
210 }
211
212 // FolderAdd Send POST request to add a folder
213 func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error {
214         response, err := xs._HTTPPost("/folder", prj)
215         if err != nil {
216                 return err
217         }
218         if response.StatusCode != 200 {
219                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
220         }
221         // Result is a XdsFolderConfig that is equivalent to ProjectConfig
222         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
223
224         return err
225 }
226
227 // FolderDelete Send DELETE request to delete a folder
228 func (xs *XdsServer) FolderDelete(id string) error {
229         return xs.client.HTTPDelete("/folder/" + id)
230 }
231
232 // FolderSync Send POST request to force synchronization of a folder
233 func (xs *XdsServer) FolderSync(id string) error {
234         return xs.client.HTTPPost("/folder/sync/"+id, "")
235 }
236
237 // SetAPIRouterGroup .
238 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
239         xs.apiRouter = r
240 }
241
242 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
243 func (xs *XdsServer) PassthroughGet(url string) {
244         if xs.apiRouter == nil {
245                 xs.Log.Errorf("apiRouter not set !")
246                 return
247         }
248
249         xs.apiRouter.GET(url, func(c *gin.Context) {
250                 var data interface{}
251                 if err := xs._HTTPGet(url, &data); err != nil {
252                         if strings.Contains(err.Error(), "connection refused") {
253                                 xs.Connected = false
254                                 xs._NotifyState()
255                         }
256                         common.APIError(c, err.Error())
257                         return
258                 }
259
260                 c.JSON(http.StatusOK, data)
261         })
262 }
263
264 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
265 func (xs *XdsServer) PassthroughPost(url string) {
266         if xs.apiRouter == nil {
267                 xs.Log.Errorf("apiRouter not set !")
268                 return
269         }
270
271         xs.apiRouter.POST(url, func(c *gin.Context) {
272                 bodyReq := []byte{}
273                 n, err := c.Request.Body.Read(bodyReq)
274                 if err != nil {
275                         common.APIError(c, err.Error())
276                         return
277                 }
278
279                 response, err := xs._HTTPPost(url, bodyReq[:n])
280                 if err != nil {
281                         common.APIError(c, err.Error())
282                         return
283                 }
284                 bodyRes, err := ioutil.ReadAll(response.Body)
285                 if err != nil {
286                         common.APIError(c, "Cannot read response body")
287                         return
288                 }
289                 c.JSON(http.StatusOK, string(bodyRes))
290         })
291 }
292
293 // EventRegister Post a request to register to an XdsServer event
294 func (xs *XdsServer) EventRegister(evName string, id string) error {
295         var err error
296         _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
297                 Name:      evName,
298                 ProjectID: id,
299         })
300         return err
301 }
302
303 // EventOn Register a callback on events reception
304 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
305         if xs.ioSock == nil {
306                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
307         }
308
309         xs.sockEventsLock.Lock()
310         defer xs.sockEventsLock.Unlock()
311
312         if _, exist := xs.sockEvents[evName]; !exist {
313                 // Register listener only the first time
314                 evn := evName
315
316                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
317                 var err error
318                 if evName == "event:FolderStateChanged" {
319                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
320                                 xs.sockEventsLock.Lock()
321                                 defer xs.sockEventsLock.Unlock()
322                                 for _, c := range xs.sockEvents[evn] {
323                                         c.Func(data)
324                                 }
325                                 return nil
326                         })
327                 } else {
328                         err = xs.ioSock.On(evn, f)
329                 }
330                 if err != nil {
331                         return uuid.Nil, err
332                 }
333         }
334
335         c := &caller{
336                 id:        uuid.NewV1(),
337                 EventName: evName,
338                 Func:      f,
339         }
340
341         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
342         return c.id, nil
343 }
344
345 // EventOff Un-register a (or all) callbacks associated to an event
346 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
347         xs.sockEventsLock.Lock()
348         defer xs.sockEventsLock.Unlock()
349         if _, exist := xs.sockEvents[evName]; exist {
350                 if id == uuid.Nil {
351                         // Un-register all
352                         xs.sockEvents[evName] = []*caller{}
353                 } else {
354                         // Un-register only the specified callback
355                         for i, ff := range xs.sockEvents[evName] {
356                                 if uuid.Equal(ff.id, id) {
357                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
358                                         break
359                                 }
360                         }
361                 }
362         }
363         return nil
364 }
365
366 // ProjectToFolder Convert Project structure to Folder structure
367 func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
368         stID := ""
369         if pPrj.Type == XdsTypeCloudSync {
370                 stID, _ = xs.SThg.IDGet()
371         }
372         fPrj := XdsFolderConfig{
373                 ID:         pPrj.ID,
374                 Label:      pPrj.Label,
375                 ClientPath: pPrj.ClientPath,
376                 Type:       XdsFolderType(pPrj.Type),
377                 Status:     pPrj.Status,
378                 IsInSync:   pPrj.IsInSync,
379                 DefaultSdk: pPrj.DefaultSdk,
380                 DataPathMap: XdsPathMapConfig{
381                         ServerPath: pPrj.ServerPath,
382                 },
383                 DataCloudSync: XdsCloudSyncConfig{
384                         SyncThingID:   stID,
385                         STLocIsInSync: pPrj.IsInSync,
386                         STLocStatus:   pPrj.Status,
387                         STSvrIsInSync: pPrj.IsInSync,
388                         STSvrStatus:   pPrj.Status,
389                 },
390         }
391
392         return &fPrj
393 }
394
395 // FolderToProject Convert Folder structure to Project structure
396 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig {
397         inSync := fPrj.IsInSync
398         sts := fPrj.Status
399
400         if fPrj.Type == XdsTypeCloudSync {
401                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
402
403                 sts = fPrj.DataCloudSync.STSvrStatus
404                 switch fPrj.DataCloudSync.STLocStatus {
405                 case StatusErrorConfig, StatusDisable, StatusPause:
406                         sts = fPrj.DataCloudSync.STLocStatus
407                         break
408                 case StatusSyncing:
409                         if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause {
410                                 sts = StatusSyncing
411                         }
412                         break
413                 case StatusEnable:
414                         // keep STSvrStatus
415                         break
416                 }
417         }
418
419         pPrj := ProjectConfig{
420                 ID:         fPrj.ID,
421                 ServerID:   xs.ID,
422                 Label:      fPrj.Label,
423                 ClientPath: fPrj.ClientPath,
424                 ServerPath: fPrj.DataPathMap.ServerPath,
425                 Type:       ProjectType(fPrj.Type),
426                 Status:     sts,
427                 IsInSync:   inSync,
428                 DefaultSdk: fPrj.DefaultSdk,
429         }
430         return pPrj
431 }
432
433 /***
434 ** Private functions
435 ***/
436
437 // Create HTTP client
438 func (xs *XdsServer) _CreateConnectHTTP() error {
439         var err error
440         xs.client, err = common.HTTPNewClient(xs.BaseURL,
441                 common.HTTPClientConfig{
442                         URLPrefix:           "/api/v1",
443                         HeaderClientKeyName: "Xds-Sid",
444                         CsrfDisable:         true,
445                         LogOut:              xs.logOut,
446                         LogPrefix:           "XDSSERVER: ",
447                         LogLevel:            common.HTTPLogLevelWarning,
448                 })
449
450         xs.client.SetLogLevel(xs.Log.Level.String())
451
452         if err != nil {
453                 msg := ": " + err.Error()
454                 if strings.Contains(err.Error(), "connection refused") {
455                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
456                 }
457                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
458         }
459         if xs.client == nil {
460                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
461         }
462
463         return nil
464 }
465
466 // _HTTPGet .
467 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
468         var dd []byte
469         if err := xs.client.HTTPGet(url, &dd); err != nil {
470                 return err
471         }
472         return json.Unmarshal(dd, &data)
473 }
474
475 // _HTTPPost .
476 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
477         body, err := json.Marshal(data)
478         if err != nil {
479                 return nil, err
480         }
481         return xs.client.HTTPPostWithRes(url, string(body))
482 }
483
484 //  Re-established connection
485 func (xs *XdsServer) _reconnect() error {
486         err := xs._connect(true)
487         if err == nil {
488                 // Reload projects list for this server
489                 err = xs.projects.Init(xs)
490         }
491         return err
492 }
493
494 //  Established HTTP and WS connection and retrieve XDSServer config
495 func (xs *XdsServer) _connect(reConn bool) error {
496
497         xdsCfg := XdsServerConfig{}
498         if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
499                 xs.Connected = false
500                 if !reConn {
501                         xs._NotifyState()
502                 }
503                 return err
504         }
505
506         if reConn && xs.ID != xdsCfg.ID {
507                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
508         }
509
510         // Update local XDS config
511         xs.ID = xdsCfg.ID
512         xs.ServerConfig = &xdsCfg
513
514         // Establish WS connection and register listen
515         if err := xs._SocketConnect(); err != nil {
516                 xs.Connected = false
517                 xs._NotifyState()
518                 return err
519         }
520
521         xs.Connected = true
522         xs._NotifyState()
523         return nil
524 }
525
526 // Create WebSocket (io.socket) connection
527 func (xs *XdsServer) _SocketConnect() error {
528
529         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
530
531         opts := &sio_client.Options{
532                 Transport: "websocket",
533                 Header:    make(map[string][]string),
534         }
535         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
536
537         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
538         if err != nil {
539                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
540         }
541         xs.ioSock = iosk
542
543         // Register some listeners
544
545         iosk.On("error", func(err error) {
546                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
547                 if xs.CBOnError != nil {
548                         xs.CBOnError(err)
549                 }
550         })
551
552         iosk.On("disconnection", func(err error) {
553                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
554                 if xs.CBOnDisconnect != nil {
555                         xs.CBOnDisconnect(err)
556                 }
557                 xs.Connected = false
558                 xs._NotifyState()
559
560                 // Try to reconnect during 15min (or at least while not disabled)
561                 go func() {
562                         count := 0
563                         waitTime := 1
564                         for !xs.Disabled && !xs.Connected {
565                                 count++
566                                 if count%60 == 0 {
567                                         waitTime *= 5
568                                 }
569                                 if waitTime > 15*60 {
570                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
571                                         return
572                                 }
573                                 time.Sleep(time.Second * time.Duration(waitTime))
574                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
575
576                                 xs._reconnect()
577                         }
578                 }()
579         })
580
581         // XXX - There is no connection event generated so, just consider that
582         // we are connected when NewClient return successfully
583         /* iosk.On("connection", func() { ... }) */
584         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
585
586         return nil
587 }
588
589 // Send event to notify changes
590 func (xs *XdsServer) _NotifyState() {
591
592         evSts := ServerCfg{
593                 ID:         xs.ID,
594                 URL:        xs.BaseURL,
595                 APIURL:     xs.APIURL,
596                 PartialURL: xs.PartialURL,
597                 ConnRetry:  xs.ConnRetry,
598                 Connected:  xs.Connected,
599         }
600         if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
601                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
602         }
603 }