Renamed apiv1 lib to xaapiv1.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/gin-gonic/gin"
14         "github.com/iotbzh/xds-agent/lib/xaapiv1"
15         "github.com/iotbzh/xds-agent/lib/xdsconfig"
16         common "github.com/iotbzh/xds-common/golib"
17         "github.com/iotbzh/xds-server/lib/xsapiv1"
18         uuid "github.com/satori/go.uuid"
19         sio_client "github.com/sebd71/go-socket.io-client"
20 )
21
22 // XdsServer .
23 type XdsServer struct {
24         *Context
25         ID           string
26         BaseURL      string
27         APIURL       string
28         PartialURL   string
29         ConnRetry    int
30         Connected    bool
31         Disabled     bool
32         ServerConfig *xsapiv1.APIConfig
33
34         // Events management
35         CBOnError      func(error)
36         CBOnDisconnect func(error)
37         sockEvents     map[string][]*caller
38         sockEventsLock *sync.Mutex
39
40         // Private fields
41         client    *common.HTTPClient
42         ioSock    *sio_client.Client
43         logOut    io.Writer
44         apiRouter *gin.RouterGroup
45 }
46
47 // EventCB Event emitter callback
48 type EventCB func(privData interface{}, evtData interface{}) error
49
50 // caller Used to chain event listeners
51 type caller struct {
52         id          uuid.UUID
53         EventName   string
54         Func        EventCB
55         PrivateData interface{}
56 }
57
58 const _IDTempoPrefix = "tempo-"
59
60 // NewXdsServer creates an instance of XdsServer
61 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
62         return &XdsServer{
63                 Context:    ctx,
64                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
65                 BaseURL:    conf.URL,
66                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
67                 PartialURL: conf.APIPartialURL,
68                 ConnRetry:  conf.ConnRetry,
69                 Connected:  false,
70                 Disabled:   false,
71
72                 sockEvents:     make(map[string][]*caller),
73                 sockEventsLock: &sync.Mutex{},
74                 logOut:         ctx.Log.Out,
75         }
76 }
77
78 // Close Free and close XDS Server connection
79 func (xs *XdsServer) Close() error {
80         xs.Connected = false
81         xs.Disabled = true
82         xs.ioSock = nil
83         xs._NotifyState()
84         return nil
85 }
86
87 // Connect Establish HTTP connection with XDS Server
88 func (xs *XdsServer) Connect() error {
89         var err error
90         var retry int
91
92         xs.Disabled = false
93         xs.Connected = false
94
95         err = nil
96         for retry = xs.ConnRetry; retry > 0; retry-- {
97                 if err = xs._CreateConnectHTTP(); err == nil {
98                         break
99                 }
100                 if retry == xs.ConnRetry {
101                         // Notify only on the first conn error
102                         // doing that avoid 2 notifs (conn false; conn true) on startup
103                         xs._NotifyState()
104                 }
105                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
106                 time.Sleep(time.Second)
107         }
108         if retry == 0 {
109                 // FIXME: re-use _reconnect to wait longer in background
110                 return fmt.Errorf("Connection to XDS Server failure")
111         }
112         if err != nil {
113                 return err
114         }
115
116         // Check HTTP connection and establish WS connection
117         err = xs._connect(false)
118
119         return err
120 }
121
122 // IsTempoID returns true when server as a temporary id
123 func (xs *XdsServer) IsTempoID() bool {
124         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
125 }
126
127 // SetLoggerOutput Set logger ou
128 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
129         xs.logOut = out
130 }
131
132 // SendCommand Send a command to XDS Server
133 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
134         url := cmd
135         if !strings.HasPrefix("/", cmd) {
136                 url = "/" + cmd
137         }
138         return xs.client.Post(url, string(body), res)
139 }
140
141 // GetVersion Send Get request to retrieve XDS Server version
142 func (xs *XdsServer) GetVersion(res interface{}) error {
143         return xs.client.Get("/version", &res)
144 }
145
146 // GetFolders Send GET request to get current folder configuration
147 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
148         return xs.client.Get("/folders", folders)
149 }
150
151 // FolderAdd Send POST request to add a folder
152 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
153         err := xs.client.Post("/folders", fld, res)
154         if err != nil {
155                 return fmt.Errorf("FolderAdd error: %s", err.Error())
156         }
157         return err
158 }
159
160 // FolderDelete Send DELETE request to delete a folder
161 func (xs *XdsServer) FolderDelete(id string) error {
162         return xs.client.HTTPDelete("/folders/" + id)
163 }
164
165 // FolderSync Send POST request to force synchronization of a folder
166 func (xs *XdsServer) FolderSync(id string) error {
167         return xs.client.HTTPPost("/folders/sync/"+id, "")
168 }
169
170 // FolderUpdate Send PUT request to update a folder
171 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
172         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
173 }
174
175 // SetAPIRouterGroup .
176 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
177         xs.apiRouter = r
178 }
179
180 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
181 func (xs *XdsServer) PassthroughGet(url string) {
182         if xs.apiRouter == nil {
183                 xs.Log.Errorf("apiRouter not set !")
184                 return
185         }
186
187         xs.apiRouter.GET(url, func(c *gin.Context) {
188                 var data interface{}
189                 // Take care of param (eg. id in /projects/:id)
190                 nURL := url
191                 if strings.Contains(url, ":") {
192                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
193                 }
194                 // Send Get request
195                 if err := xs.client.Get(nURL, &data); err != nil {
196                         if strings.Contains(err.Error(), "connection refused") {
197                                 xs.Connected = false
198                                 xs._NotifyState()
199                         }
200                         common.APIError(c, err.Error())
201                         return
202                 }
203
204                 c.JSON(http.StatusOK, data)
205         })
206 }
207
208 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
209 func (xs *XdsServer) PassthroughPost(url string) {
210         if xs.apiRouter == nil {
211                 xs.Log.Errorf("apiRouter not set !")
212                 return
213         }
214
215         xs.apiRouter.POST(url, func(c *gin.Context) {
216                 bodyReq := []byte{}
217                 n, err := c.Request.Body.Read(bodyReq)
218                 if err != nil {
219                         common.APIError(c, err.Error())
220                         return
221                 }
222
223                 // Take care of param (eg. id in /projects/:id)
224                 nURL := url
225                 if strings.Contains(url, ":") {
226                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
227                 }
228
229                 // Send Post request
230                 body, err := json.Marshal(bodyReq[:n])
231                 if err != nil {
232                         common.APIError(c, err.Error())
233                         return
234                 }
235
236                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
237                 if err != nil {
238                         common.APIError(c, err.Error())
239                         return
240                 }
241
242                 bodyRes, err := ioutil.ReadAll(response.Body)
243                 if err != nil {
244                         common.APIError(c, "Cannot read response body")
245                         return
246                 }
247                 c.JSON(http.StatusOK, string(bodyRes))
248         })
249 }
250
251 // EventRegister Post a request to register to an XdsServer event
252 func (xs *XdsServer) EventRegister(evName string, id string) error {
253         return xs.client.Post(
254                 "/events/register",
255                 xsapiv1.EventRegisterArgs{
256                         Name:      evName,
257                         ProjectID: id,
258                 },
259                 nil)
260 }
261
262 // EventOn Register a callback on events reception
263 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
264         if xs.ioSock == nil {
265                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
266         }
267
268         xs.sockEventsLock.Lock()
269         defer xs.sockEventsLock.Unlock()
270
271         if _, exist := xs.sockEvents[evName]; !exist {
272                 // Register listener only the first time
273                 evn := evName
274
275                 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
276                 var err error
277                 if evName == "event:folder-state-change" {
278                         err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
279                                 xs.sockEventsLock.Lock()
280                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
281                                 copy(sEvts, xs.sockEvents[evn])
282                                 xs.sockEventsLock.Unlock()
283                                 for _, c := range sEvts {
284                                         c.Func(c.PrivateData, data)
285                                 }
286                                 return nil
287                         })
288                 } else {
289                         err = xs.ioSock.On(evn, func(data interface{}) error {
290                                 xs.sockEventsLock.Lock()
291                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
292                                 copy(sEvts, xs.sockEvents[evn])
293                                 xs.sockEventsLock.Unlock()
294                                 for _, c := range sEvts {
295                                         c.Func(c.PrivateData, data)
296                                 }
297                                 return nil
298                         })
299                 }
300                 if err != nil {
301                         return uuid.Nil, err
302                 }
303         }
304
305         c := &caller{
306                 id:          uuid.NewV1(),
307                 EventName:   evName,
308                 Func:        f,
309                 PrivateData: privData,
310         }
311
312         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
313         return c.id, nil
314 }
315
316 // EventOff Un-register a (or all) callbacks associated to an event
317 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
318         xs.sockEventsLock.Lock()
319         defer xs.sockEventsLock.Unlock()
320         if _, exist := xs.sockEvents[evName]; exist {
321                 if id == uuid.Nil {
322                         // Un-register all
323                         xs.sockEvents[evName] = []*caller{}
324                 } else {
325                         // Un-register only the specified callback
326                         for i, ff := range xs.sockEvents[evName] {
327                                 if uuid.Equal(ff.id, id) {
328                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
329                                         break
330                                 }
331                         }
332                 }
333         }
334         return nil
335 }
336
337 // ProjectToFolder Convert Project structure to Folder structure
338 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
339         stID := ""
340         if pPrj.Type == xsapiv1.TypeCloudSync {
341                 stID, _ = xs.SThg.IDGet()
342         }
343         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
344         fPrj := xsapiv1.FolderConfig{
345                 ID:         pPrj.ID,
346                 Label:      pPrj.Label,
347                 ClientPath: pPrj.ClientPath,
348                 Type:       xsapiv1.FolderType(pPrj.Type),
349                 Status:     pPrj.Status,
350                 IsInSync:   pPrj.IsInSync,
351                 DefaultSdk: pPrj.DefaultSdk,
352                 ClientData: pPrj.ClientData,
353                 DataPathMap: xsapiv1.PathMapConfig{
354                         ServerPath: pPrj.ServerPath,
355                 },
356                 DataCloudSync: xsapiv1.CloudSyncConfig{
357                         SyncThingID:   stID,
358                         STLocIsInSync: pPrj.IsInSync,
359                         STLocStatus:   pPrj.Status,
360                         STSvrIsInSync: pPrj.IsInSync,
361                         STSvrStatus:   pPrj.Status,
362                 },
363         }
364
365         return &fPrj
366 }
367
368 // FolderToProject Convert Folder structure to Project structure
369 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
370         inSync := fPrj.IsInSync
371         sts := fPrj.Status
372
373         if fPrj.Type == xsapiv1.TypeCloudSync {
374                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
375
376                 sts = fPrj.DataCloudSync.STSvrStatus
377                 switch fPrj.DataCloudSync.STLocStatus {
378                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
379                         sts = fPrj.DataCloudSync.STLocStatus
380                         break
381                 case xaapiv1.StatusSyncing:
382                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
383                                 sts = xaapiv1.StatusSyncing
384                         }
385                         break
386                 case xaapiv1.StatusEnable:
387                         // keep STSvrStatus
388                         break
389                 }
390         }
391
392         pPrj := xaapiv1.ProjectConfig{
393                 ID:         fPrj.ID,
394                 ServerID:   xs.ID,
395                 Label:      fPrj.Label,
396                 ClientPath: fPrj.ClientPath,
397                 ServerPath: fPrj.DataPathMap.ServerPath,
398                 Type:       xaapiv1.ProjectType(fPrj.Type),
399                 Status:     sts,
400                 IsInSync:   inSync,
401                 DefaultSdk: fPrj.DefaultSdk,
402                 ClientData: fPrj.ClientData,
403         }
404         return pPrj
405 }
406
407 /***
408 ** Private functions
409 ***/
410
411 // Create HTTP client
412 func (xs *XdsServer) _CreateConnectHTTP() error {
413         var err error
414         xs.client, err = common.HTTPNewClient(xs.BaseURL,
415                 common.HTTPClientConfig{
416                         URLPrefix:           "/api/v1",
417                         HeaderClientKeyName: "Xds-Sid",
418                         CsrfDisable:         true,
419                         LogOut:              xs.logOut,
420                         LogPrefix:           "XDSSERVER: ",
421                         LogLevel:            common.HTTPLogLevelWarning,
422                 })
423
424         xs.client.SetLogLevel(xs.Log.Level.String())
425
426         if err != nil {
427                 msg := ": " + err.Error()
428                 if strings.Contains(err.Error(), "connection refused") {
429                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
430                 }
431                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
432         }
433         if xs.client == nil {
434                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
435         }
436
437         return nil
438 }
439
440 //  Re-established connection
441 func (xs *XdsServer) _reconnect() error {
442         err := xs._connect(true)
443         if err == nil {
444                 // Reload projects list for this server
445                 err = xs.projects.Init(xs)
446         }
447         return err
448 }
449
450 //  Established HTTP and WS connection and retrieve XDSServer config
451 func (xs *XdsServer) _connect(reConn bool) error {
452
453         xdsCfg := xsapiv1.APIConfig{}
454         if err := xs.client.Get("/config", &xdsCfg); err != nil {
455                 xs.Connected = false
456                 if !reConn {
457                         xs._NotifyState()
458                 }
459                 return err
460         }
461
462         if reConn && xs.ID != xdsCfg.ServerUID {
463                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
464         }
465
466         // Update local XDS config
467         xs.ID = xdsCfg.ServerUID
468         xs.ServerConfig = &xdsCfg
469
470         // Establish WS connection and register listen
471         if err := xs._SocketConnect(); err != nil {
472                 xs.Connected = false
473                 xs._NotifyState()
474                 return err
475         }
476
477         xs.Connected = true
478         xs._NotifyState()
479         return nil
480 }
481
482 // Create WebSocket (io.socket) connection
483 func (xs *XdsServer) _SocketConnect() error {
484
485         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
486
487         opts := &sio_client.Options{
488                 Transport: "websocket",
489                 Header:    make(map[string][]string),
490         }
491         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
492
493         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
494         if err != nil {
495                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
496         }
497         xs.ioSock = iosk
498
499         // Register some listeners
500
501         iosk.On("error", func(err error) {
502                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
503                 if xs.CBOnError != nil {
504                         xs.CBOnError(err)
505                 }
506         })
507
508         iosk.On("disconnection", func(err error) {
509                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
510                 if xs.CBOnDisconnect != nil {
511                         xs.CBOnDisconnect(err)
512                 }
513                 xs.Connected = false
514                 xs._NotifyState()
515
516                 // Try to reconnect during 15min (or at least while not disabled)
517                 go func() {
518                         count := 0
519                         waitTime := 1
520                         for !xs.Disabled && !xs.Connected {
521                                 count++
522                                 if count%60 == 0 {
523                                         waitTime *= 5
524                                 }
525                                 if waitTime > 15*60 {
526                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
527                                         return
528                                 }
529                                 time.Sleep(time.Second * time.Duration(waitTime))
530                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
531
532                                 xs._reconnect()
533                         }
534                 }()
535         })
536
537         // XXX - There is no connection event generated so, just consider that
538         // we are connected when NewClient return successfully
539         /* iosk.On("connection", func() { ... }) */
540         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
541
542         return nil
543 }
544
545 // Send event to notify changes
546 func (xs *XdsServer) _NotifyState() {
547
548         evSts := xaapiv1.ServerCfg{
549                 ID:         xs.ID,
550                 URL:        xs.BaseURL,
551                 APIURL:     xs.APIURL,
552                 PartialURL: xs.PartialURL,
553                 ConnRetry:  xs.ConnRetry,
554                 Connected:  xs.Connected,
555         }
556         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
557                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
558         }
559 }