e2c38c1bbecbf1780cdbb0dec5984a2160464ba5
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/apiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         uuid "github.com/satori/go.uuid"
18         sio_client "github.com/sebd71/go-socket.io-client"
19 )
20
21 // XdsServer .
22 type XdsServer struct {
23         *Context
24         ID           string
25         BaseURL      string
26         APIURL       string
27         PartialURL   string
28         ConnRetry    int
29         Connected    bool
30         Disabled     bool
31         ServerConfig *XdsServerConfig
32
33         // Events management
34         CBOnError      func(error)
35         CBOnDisconnect func(error)
36         sockEvents     map[string][]*caller
37         sockEventsLock *sync.Mutex
38
39         // Private fields
40         client    *common.HTTPClient
41         ioSock    *sio_client.Client
42         logOut    io.Writer
43         apiRouter *gin.RouterGroup
44 }
45
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
48         ID               string           `json:"id"`
49         Version          string           `json:"version"`
50         APIVersion       string           `json:"apiVersion"`
51         VersionGitTag    string           `json:"gitTag"`
52         SupportedSharing map[string]bool  `json:"supportedSharing"`
53         Builder          XdsBuilderConfig `json:"builder"`
54 }
55
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
58         IP          string `json:"ip"`
59         Port        string `json:"port"`
60         SyncThingID string `json:"syncThingID"`
61 }
62
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
65
66 const (
67         // XdsTypePathMap Path Mapping folder type
68         XdsTypePathMap = "PathMap"
69         // XdsTypeCloudSync Cloud synchronization (AKA syncthing) folder type
70         XdsTypeCloudSync = "CloudSync"
71         // XdsTypeCifsSmb CIFS (AKA samba) folder type
72         XdsTypeCifsSmb = "CIFS"
73 )
74
75 // XdsFolderConfig XdsServer folder config
76 type XdsFolderConfig struct {
77         ID         string        `json:"id"`
78         Label      string        `json:"label"`
79         ClientPath string        `json:"path"`
80         Type       XdsFolderType `json:"type"`
81         Status     string        `json:"status"`
82         IsInSync   bool          `json:"isInSync"`
83         DefaultSdk string        `json:"defaultSdk"`
84         ClientData string        `json:"clientData"` // free form field that can used by client
85
86         // Specific data depending on which Type is used
87         DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
88         DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
89 }
90
91 // XdsPathMapConfig Path mapping specific data
92 type XdsPathMapConfig struct {
93         ServerPath   string `json:"serverPath"`
94         CheckFile    string `json:"checkFile"`
95         CheckContent string `json:"checkContent"`
96 }
97
98 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
99 type XdsCloudSyncConfig struct {
100         SyncThingID   string `json:"syncThingID"`
101         STSvrStatus   string `json:"-"`
102         STSvrIsInSync bool   `json:"-"`
103         STLocStatus   string `json:"-"`
104         STLocIsInSync bool   `json:"-"`
105 }
106
107 // XdsEventRegisterArgs arguments used to register to XDS server events
108 type XdsEventRegisterArgs struct {
109         Name      string `json:"name"`
110         ProjectID string `json:"filterProjectID"`
111 }
112
113 // XdsEventFolderChange Folder change event structure
114 type XdsEventFolderChange struct {
115         Time   string          `json:"time"`
116         Type   string          `json:"type"`
117         Folder XdsFolderConfig `json:"folder"`
118 }
119
120 // EventCB Event emitter callback
121 type EventCB func(privData interface{}, evtData interface{}) error
122
123 // caller Used to chain event listeners
124 type caller struct {
125         id          uuid.UUID
126         EventName   string
127         Func        EventCB
128         PrivateData interface{}
129 }
130
131 const _IDTempoPrefix = "tempo-"
132
133 // NewXdsServer creates an instance of XdsServer
134 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
135         return &XdsServer{
136                 Context:    ctx,
137                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
138                 BaseURL:    conf.URL,
139                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
140                 PartialURL: conf.APIPartialURL,
141                 ConnRetry:  conf.ConnRetry,
142                 Connected:  false,
143                 Disabled:   false,
144
145                 sockEvents:     make(map[string][]*caller),
146                 sockEventsLock: &sync.Mutex{},
147                 logOut:         ctx.Log.Out,
148         }
149 }
150
151 // Close Free and close XDS Server connection
152 func (xs *XdsServer) Close() error {
153         xs.Connected = false
154         xs.Disabled = true
155         xs.ioSock = nil
156         xs._NotifyState()
157         return nil
158 }
159
160 // Connect Establish HTTP connection with XDS Server
161 func (xs *XdsServer) Connect() error {
162         var err error
163         var retry int
164
165         xs.Disabled = false
166         xs.Connected = false
167
168         err = nil
169         for retry = xs.ConnRetry; retry > 0; retry-- {
170                 if err = xs._CreateConnectHTTP(); err == nil {
171                         break
172                 }
173                 if retry == xs.ConnRetry {
174                         // Notify only on the first conn error
175                         // doing that avoid 2 notifs (conn false; conn true) on startup
176                         xs._NotifyState()
177                 }
178                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
179                 time.Sleep(time.Second)
180         }
181         if retry == 0 {
182                 // FIXME: re-use _reconnect to wait longer in background
183                 return fmt.Errorf("Connection to XDS Server failure")
184         }
185         if err != nil {
186                 return err
187         }
188
189         // Check HTTP connection and establish WS connection
190         err = xs._connect(false)
191
192         return err
193 }
194
195 // IsTempoID returns true when server as a temporary id
196 func (xs *XdsServer) IsTempoID() bool {
197         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
198 }
199
200 // SetLoggerOutput Set logger ou
201 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
202         xs.logOut = out
203 }
204
205 // SendCommand Send a command to XDS Server
206 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
207         url := cmd
208         if !strings.HasPrefix("/", cmd) {
209                 url = "/" + cmd
210         }
211         return xs.client.HTTPPostWithRes(url, string(body))
212 }
213
214 // GetVersion Send Get request to retrieve XDS Server version
215 func (xs *XdsServer) GetVersion(res interface{}) error {
216         return xs.client.Get("/version", &res)
217 }
218
219 // GetFolders Send GET request to get current folder configuration
220 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
221         return xs.client.Get("/folders", folders)
222 }
223
224 // FolderAdd Send POST request to add a folder
225 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
226         err := xs.client.Post("/folders", fld, res)
227         if err != nil {
228                 return fmt.Errorf("FolderAdd error: %s", err.Error())
229         }
230         return err
231 }
232
233 // FolderDelete Send DELETE request to delete a folder
234 func (xs *XdsServer) FolderDelete(id string) error {
235         return xs.client.HTTPDelete("/folders/" + id)
236 }
237
238 // FolderSync Send POST request to force synchronization of a folder
239 func (xs *XdsServer) FolderSync(id string) error {
240         return xs.client.HTTPPost("/folders/sync/"+id, "")
241 }
242
243 // FolderUpdate Send PUT request to update a folder
244 func (xs *XdsServer) FolderUpdate(fld *XdsFolderConfig, resFld *XdsFolderConfig) error {
245         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
246 }
247
248 // SetAPIRouterGroup .
249 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
250         xs.apiRouter = r
251 }
252
253 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
254 func (xs *XdsServer) PassthroughGet(url string) {
255         if xs.apiRouter == nil {
256                 xs.Log.Errorf("apiRouter not set !")
257                 return
258         }
259
260         xs.apiRouter.GET(url, func(c *gin.Context) {
261                 var data interface{}
262                 // Take care of param (eg. id in /projects/:id)
263                 nURL := url
264                 if strings.Contains(url, ":") {
265                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
266                 }
267                 // Send Get request
268                 if err := xs.client.Get(nURL, &data); err != nil {
269                         if strings.Contains(err.Error(), "connection refused") {
270                                 xs.Connected = false
271                                 xs._NotifyState()
272                         }
273                         common.APIError(c, err.Error())
274                         return
275                 }
276
277                 c.JSON(http.StatusOK, data)
278         })
279 }
280
281 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
282 func (xs *XdsServer) PassthroughPost(url string) {
283         if xs.apiRouter == nil {
284                 xs.Log.Errorf("apiRouter not set !")
285                 return
286         }
287
288         xs.apiRouter.POST(url, func(c *gin.Context) {
289                 bodyReq := []byte{}
290                 n, err := c.Request.Body.Read(bodyReq)
291                 if err != nil {
292                         common.APIError(c, err.Error())
293                         return
294                 }
295
296                 // Take care of param (eg. id in /projects/:id)
297                 nURL := url
298                 if strings.Contains(url, ":") {
299                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
300                 }
301
302                 // Send Post request
303                 body, err := json.Marshal(bodyReq[:n])
304                 if err != nil {
305                         common.APIError(c, err.Error())
306                         return
307                 }
308
309                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
310                 if err != nil {
311                         common.APIError(c, err.Error())
312                         return
313                 }
314
315                 bodyRes, err := ioutil.ReadAll(response.Body)
316                 if err != nil {
317                         common.APIError(c, "Cannot read response body")
318                         return
319                 }
320                 c.JSON(http.StatusOK, string(bodyRes))
321         })
322 }
323
324 // EventRegister Post a request to register to an XdsServer event
325 func (xs *XdsServer) EventRegister(evName string, id string) error {
326         return xs.client.Post(
327                 "/events/register",
328                 XdsEventRegisterArgs{
329                         Name:      evName,
330                         ProjectID: id,
331                 },
332                 nil)
333 }
334
335 // EventOn Register a callback on events reception
336 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
337         if xs.ioSock == nil {
338                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
339         }
340
341         xs.sockEventsLock.Lock()
342         defer xs.sockEventsLock.Unlock()
343
344         if _, exist := xs.sockEvents[evName]; !exist {
345                 // Register listener only the first time
346                 evn := evName
347
348                 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
349                 var err error
350                 if evName == "event:folder-state-change" {
351                         err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
352                                 xs.sockEventsLock.Lock()
353                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
354                                 copy(sEvts, xs.sockEvents[evn])
355                                 xs.sockEventsLock.Unlock()
356                                 for _, c := range sEvts {
357                                         c.Func(c.PrivateData, data)
358                                 }
359                                 return nil
360                         })
361                 } else {
362                         err = xs.ioSock.On(evn, func(data interface{}) error {
363                                 xs.sockEventsLock.Lock()
364                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
365                                 copy(sEvts, xs.sockEvents[evn])
366                                 xs.sockEventsLock.Unlock()
367                                 for _, c := range sEvts {
368                                         c.Func(c.PrivateData, data)
369                                 }
370                                 return nil
371                         })
372                 }
373                 if err != nil {
374                         return uuid.Nil, err
375                 }
376         }
377
378         c := &caller{
379                 id:          uuid.NewV1(),
380                 EventName:   evName,
381                 Func:        f,
382                 PrivateData: privData,
383         }
384
385         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
386         return c.id, nil
387 }
388
389 // EventOff Un-register a (or all) callbacks associated to an event
390 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
391         xs.sockEventsLock.Lock()
392         defer xs.sockEventsLock.Unlock()
393         if _, exist := xs.sockEvents[evName]; exist {
394                 if id == uuid.Nil {
395                         // Un-register all
396                         xs.sockEvents[evName] = []*caller{}
397                 } else {
398                         // Un-register only the specified callback
399                         for i, ff := range xs.sockEvents[evName] {
400                                 if uuid.Equal(ff.id, id) {
401                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
402                                         break
403                                 }
404                         }
405                 }
406         }
407         return nil
408 }
409
410 // ProjectToFolder Convert Project structure to Folder structure
411 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
412         stID := ""
413         if pPrj.Type == XdsTypeCloudSync {
414                 stID, _ = xs.SThg.IDGet()
415         }
416         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
417         fPrj := XdsFolderConfig{
418                 ID:         pPrj.ID,
419                 Label:      pPrj.Label,
420                 ClientPath: pPrj.ClientPath,
421                 Type:       XdsFolderType(pPrj.Type),
422                 Status:     pPrj.Status,
423                 IsInSync:   pPrj.IsInSync,
424                 DefaultSdk: pPrj.DefaultSdk,
425                 ClientData: pPrj.ClientData,
426                 DataPathMap: XdsPathMapConfig{
427                         ServerPath: pPrj.ServerPath,
428                 },
429                 DataCloudSync: XdsCloudSyncConfig{
430                         SyncThingID:   stID,
431                         STLocIsInSync: pPrj.IsInSync,
432                         STLocStatus:   pPrj.Status,
433                         STSvrIsInSync: pPrj.IsInSync,
434                         STSvrStatus:   pPrj.Status,
435                 },
436         }
437
438         return &fPrj
439 }
440
441 // FolderToProject Convert Folder structure to Project structure
442 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
443         inSync := fPrj.IsInSync
444         sts := fPrj.Status
445
446         if fPrj.Type == XdsTypeCloudSync {
447                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
448
449                 sts = fPrj.DataCloudSync.STSvrStatus
450                 switch fPrj.DataCloudSync.STLocStatus {
451                 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
452                         sts = fPrj.DataCloudSync.STLocStatus
453                         break
454                 case apiv1.StatusSyncing:
455                         if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
456                                 sts = apiv1.StatusSyncing
457                         }
458                         break
459                 case apiv1.StatusEnable:
460                         // keep STSvrStatus
461                         break
462                 }
463         }
464
465         pPrj := apiv1.ProjectConfig{
466                 ID:         fPrj.ID,
467                 ServerID:   xs.ID,
468                 Label:      fPrj.Label,
469                 ClientPath: fPrj.ClientPath,
470                 ServerPath: fPrj.DataPathMap.ServerPath,
471                 Type:       apiv1.ProjectType(fPrj.Type),
472                 Status:     sts,
473                 IsInSync:   inSync,
474                 DefaultSdk: fPrj.DefaultSdk,
475                 ClientData: fPrj.ClientData,
476         }
477         return pPrj
478 }
479
480 /***
481 ** Private functions
482 ***/
483
484 // Create HTTP client
485 func (xs *XdsServer) _CreateConnectHTTP() error {
486         var err error
487         xs.client, err = common.HTTPNewClient(xs.BaseURL,
488                 common.HTTPClientConfig{
489                         URLPrefix:           "/api/v1",
490                         HeaderClientKeyName: "Xds-Sid",
491                         CsrfDisable:         true,
492                         LogOut:              xs.logOut,
493                         LogPrefix:           "XDSSERVER: ",
494                         LogLevel:            common.HTTPLogLevelWarning,
495                 })
496
497         xs.client.SetLogLevel(xs.Log.Level.String())
498
499         if err != nil {
500                 msg := ": " + err.Error()
501                 if strings.Contains(err.Error(), "connection refused") {
502                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
503                 }
504                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
505         }
506         if xs.client == nil {
507                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
508         }
509
510         return nil
511 }
512
513 //  Re-established connection
514 func (xs *XdsServer) _reconnect() error {
515         err := xs._connect(true)
516         if err == nil {
517                 // Reload projects list for this server
518                 err = xs.projects.Init(xs)
519         }
520         return err
521 }
522
523 //  Established HTTP and WS connection and retrieve XDSServer config
524 func (xs *XdsServer) _connect(reConn bool) error {
525
526         xdsCfg := XdsServerConfig{}
527         if err := xs.client.Get("/config", &xdsCfg); err != nil {
528                 xs.Connected = false
529                 if !reConn {
530                         xs._NotifyState()
531                 }
532                 return err
533         }
534
535         if reConn && xs.ID != xdsCfg.ID {
536                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
537         }
538
539         // Update local XDS config
540         xs.ID = xdsCfg.ID
541         xs.ServerConfig = &xdsCfg
542
543         // Establish WS connection and register listen
544         if err := xs._SocketConnect(); err != nil {
545                 xs.Connected = false
546                 xs._NotifyState()
547                 return err
548         }
549
550         xs.Connected = true
551         xs._NotifyState()
552         return nil
553 }
554
555 // Create WebSocket (io.socket) connection
556 func (xs *XdsServer) _SocketConnect() error {
557
558         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
559
560         opts := &sio_client.Options{
561                 Transport: "websocket",
562                 Header:    make(map[string][]string),
563         }
564         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
565
566         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
567         if err != nil {
568                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
569         }
570         xs.ioSock = iosk
571
572         // Register some listeners
573
574         iosk.On("error", func(err error) {
575                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
576                 if xs.CBOnError != nil {
577                         xs.CBOnError(err)
578                 }
579         })
580
581         iosk.On("disconnection", func(err error) {
582                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
583                 if xs.CBOnDisconnect != nil {
584                         xs.CBOnDisconnect(err)
585                 }
586                 xs.Connected = false
587                 xs._NotifyState()
588
589                 // Try to reconnect during 15min (or at least while not disabled)
590                 go func() {
591                         count := 0
592                         waitTime := 1
593                         for !xs.Disabled && !xs.Connected {
594                                 count++
595                                 if count%60 == 0 {
596                                         waitTime *= 5
597                                 }
598                                 if waitTime > 15*60 {
599                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
600                                         return
601                                 }
602                                 time.Sleep(time.Second * time.Duration(waitTime))
603                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
604
605                                 xs._reconnect()
606                         }
607                 }()
608         })
609
610         // XXX - There is no connection event generated so, just consider that
611         // we are connected when NewClient return successfully
612         /* iosk.On("connection", func() { ... }) */
613         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
614
615         return nil
616 }
617
618 // Send event to notify changes
619 func (xs *XdsServer) _NotifyState() {
620
621         evSts := apiv1.ServerCfg{
622                 ID:         xs.ID,
623                 URL:        xs.BaseURL,
624                 APIURL:     xs.APIURL,
625                 PartialURL: xs.PartialURL,
626                 ConnRetry:  xs.ConnRetry,
627                 Connected:  xs.Connected,
628         }
629         if err := xs.events.Emit(apiv1.EVTServerConfig, evSts, ""); err != nil {
630                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
631         }
632 }