Added new SDKs management support
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 /*
2  * Copyright (C) 2017 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package agent
19
20 import (
21         "encoding/json"
22         "fmt"
23         "io"
24         "net/http"
25         "strings"
26         "sync"
27         "time"
28
29         "github.com/gin-gonic/gin"
30         "github.com/iotbzh/xds-agent/lib/xaapiv1"
31         "github.com/iotbzh/xds-agent/lib/xdsconfig"
32         common "github.com/iotbzh/xds-common/golib"
33         "github.com/iotbzh/xds-server/lib/xsapiv1"
34         uuid "github.com/satori/go.uuid"
35         sio_client "github.com/sebd71/go-socket.io-client"
36 )
37
38 // XdsServer .
39 type XdsServer struct {
40         *Context
41         ID           string
42         BaseURL      string
43         APIURL       string
44         PartialURL   string
45         ConnRetry    int
46         Connected    bool
47         Disabled     bool
48         ServerConfig *xsapiv1.APIConfig
49
50         // Events management
51         CBOnError      func(error)
52         CBOnDisconnect func(error)
53         sockEvents     map[string][]*caller
54         sockEventsLock *sync.Mutex
55
56         // Private fields
57         client    *common.HTTPClient
58         ioSock    *sio_client.Client
59         logOut    io.Writer
60         apiRouter *gin.RouterGroup
61         cmdList   map[string]interface{}
62 }
63
64 // EventCB Event emitter callback
65 type EventCB func(privData interface{}, evtData interface{}) error
66
67 // caller Used to chain event listeners
68 type caller struct {
69         id          uuid.UUID
70         EventName   string
71         Func        EventCB
72         PrivateData interface{}
73 }
74
75 const _IDTempoPrefix = "tempo-"
76
77 // NewXdsServer creates an instance of XdsServer
78 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
79         return &XdsServer{
80                 Context:    ctx,
81                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
82                 BaseURL:    conf.URL,
83                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
84                 PartialURL: conf.APIPartialURL,
85                 ConnRetry:  conf.ConnRetry,
86                 Connected:  false,
87                 Disabled:   false,
88
89                 sockEvents:     make(map[string][]*caller),
90                 sockEventsLock: &sync.Mutex{},
91                 logOut:         ctx.Log.Out,
92                 cmdList:        make(map[string]interface{}),
93         }
94 }
95
96 // Close Free and close XDS Server connection
97 func (xs *XdsServer) Close() error {
98         err := xs._Disconnected()
99         xs.Disabled = true
100         return err
101 }
102
103 // Connect Establish HTTP connection with XDS Server
104 func (xs *XdsServer) Connect() error {
105         var err error
106         var retry int
107
108         xs.Disabled = false
109         xs.Connected = false
110
111         err = nil
112         for retry = xs.ConnRetry; retry > 0; retry-- {
113                 if err = xs._CreateConnectHTTP(); err == nil {
114                         break
115                 }
116                 if retry == xs.ConnRetry {
117                         // Notify only on the first conn error
118                         // doing that avoid 2 notifs (conn false; conn true) on startup
119                         xs._NotifyState()
120                 }
121                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
122                 time.Sleep(time.Second)
123         }
124         if retry == 0 {
125                 // FIXME: re-use _Reconnect to wait longer in background
126                 return fmt.Errorf("Connection to XDS Server failure")
127         }
128         if err != nil {
129                 return err
130         }
131
132         // Check HTTP connection and establish WS connection
133         err = xs._Connect(false)
134
135         return err
136 }
137
138 // IsTempoID returns true when server as a temporary id
139 func (xs *XdsServer) IsTempoID() bool {
140         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
141 }
142
143 // SetLoggerOutput Set logger ou
144 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
145         xs.logOut = out
146 }
147
148 // SendCommand Send a command to XDS Server
149 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
150         url := cmd
151         if !strings.HasPrefix("/", cmd) {
152                 url = "/" + cmd
153         }
154         return xs.client.Post(url, string(body), res)
155 }
156
157 // GetVersion Send Get request to retrieve XDS Server version
158 func (xs *XdsServer) GetVersion(res interface{}) error {
159         return xs.client.Get("/version", &res)
160 }
161
162 // GetFolders Send GET request to get current folder configuration
163 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
164         return xs.client.Get("/folders", folders)
165 }
166
167 // FolderAdd Send POST request to add a folder
168 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
169         err := xs.client.Post("/folders", fld, res)
170         if err != nil {
171                 return fmt.Errorf("FolderAdd error: %s", err.Error())
172         }
173         return err
174 }
175
176 // FolderDelete Send DELETE request to delete a folder
177 func (xs *XdsServer) FolderDelete(id string) error {
178         return xs.client.HTTPDelete("/folders/" + id)
179 }
180
181 // FolderSync Send POST request to force synchronization of a folder
182 func (xs *XdsServer) FolderSync(id string) error {
183         return xs.client.HTTPPost("/folders/sync/"+id, "")
184 }
185
186 // FolderUpdate Send PUT request to update a folder
187 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
188         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
189 }
190
191 // CommandExec Send POST request to execute a command
192 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
193         return xs.client.Post("/exec", args, res)
194 }
195
196 // CommandSignal Send POST request to send a signal to a command
197 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
198         return xs.client.Post("/signal", args, res)
199 }
200
201 // SetAPIRouterGroup .
202 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
203         xs.apiRouter = r
204 }
205
206 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
207 func (xs *XdsServer) PassthroughGet(url string) {
208         if xs.apiRouter == nil {
209                 xs.Log.Errorf("apiRouter not set !")
210                 return
211         }
212
213         xs.apiRouter.GET(url, func(c *gin.Context) {
214                 var data interface{}
215                 // Take care of param (eg. id in /projects/:id)
216                 nURL := url
217                 if strings.Contains(url, ":") {
218                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
219                 }
220                 // Send Get request
221                 if err := xs.client.Get(nURL, &data); err != nil {
222                         if strings.Contains(err.Error(), "connection refused") {
223                                 xs._Disconnected()
224                         }
225                         common.APIError(c, err.Error())
226                         return
227                 }
228
229                 c.JSON(http.StatusOK, data)
230         })
231 }
232
233 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
234 func (xs *XdsServer) PassthroughPost(url string) {
235         if xs.apiRouter == nil {
236                 xs.Log.Errorf("apiRouter not set !")
237                 return
238         }
239
240         xs.apiRouter.POST(url, func(c *gin.Context) {
241                 var err error
242                 var data interface{}
243
244                 // Get raw body
245                 body, err := c.GetRawData()
246                 if err != nil {
247                         common.APIError(c, err.Error())
248                         return
249                 }
250
251                 // Take care of param (eg. id in /projects/:id)
252                 nURL := url
253                 if strings.Contains(url, ":") {
254                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
255                 }
256
257                 // Send Post request
258                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
259                 if err != nil {
260                         goto httpError
261                 }
262                 if response.StatusCode != 200 {
263                         err = fmt.Errorf(response.Status)
264                         return
265                 }
266                 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
267                 if err != nil {
268                         goto httpError
269                 }
270
271                 c.JSON(http.StatusOK, data)
272                 return
273
274                 /* Handle error case */
275         httpError:
276                 if strings.Contains(err.Error(), "connection refused") {
277                         xs._Disconnected()
278                 }
279                 common.APIError(c, err.Error())
280         })
281 }
282
283 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
284 func (xs *XdsServer) PassthroughDelete(url string) {
285         if xs.apiRouter == nil {
286                 xs.Log.Errorf("apiRouter not set !")
287                 return
288         }
289
290         xs.apiRouter.DELETE(url, func(c *gin.Context) {
291                 var err error
292                 var data interface{}
293
294                 // Take care of param (eg. id in /projects/:id)
295                 nURL := url
296                 if strings.Contains(url, ":") {
297                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
298                 }
299
300                 // Send Post request
301                 response, err := xs.client.HTTPDeleteWithRes(nURL)
302                 if err != nil {
303                         goto httpError
304                 }
305                 if response.StatusCode != 200 {
306                         err = fmt.Errorf(response.Status)
307                         return
308                 }
309                 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
310                 if err != nil {
311                         goto httpError
312                 }
313
314                 c.JSON(http.StatusOK, data)
315                 return
316
317                 /* Handle error case */
318         httpError:
319                 if strings.Contains(err.Error(), "connection refused") {
320                         xs._Disconnected()
321                 }
322                 common.APIError(c, err.Error())
323         })
324 }
325
326 // EventRegister Post a request to register to an XdsServer event
327 func (xs *XdsServer) EventRegister(evName string, filter string) error {
328         return xs.client.Post("/events/register",
329                 xsapiv1.EventRegisterArgs{
330                         Name:   evName,
331                         Filter: filter,
332                 },
333                 nil)
334 }
335
336 // EventEmit Emit a event to XDS Server through WS
337 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
338         if xs.ioSock == nil {
339                 return fmt.Errorf("Io.Socket not initialized")
340         }
341
342         return xs.ioSock.Emit(message, args...)
343 }
344
345 // EventOn Register a callback on events reception
346 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
347         if xs.ioSock == nil {
348                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
349         }
350
351         xs.sockEventsLock.Lock()
352         defer xs.sockEventsLock.Unlock()
353
354         if _, exist := xs.sockEvents[evName]; !exist {
355                 // Register listener only the first time
356                 evn := evName
357
358                 err := xs.ioSock.On(evn, func(data interface{}) error {
359                         xs.sockEventsLock.Lock()
360                         sEvts := make([]*caller, len(xs.sockEvents[evn]))
361                         copy(sEvts, xs.sockEvents[evn])
362                         xs.sockEventsLock.Unlock()
363                         for _, c := range sEvts {
364                                 c.Func(c.PrivateData, data)
365                         }
366                         return nil
367                 })
368                 if err != nil {
369                         return uuid.Nil, err
370                 }
371         }
372
373         c := &caller{
374                 id:          uuid.NewV1(),
375                 EventName:   evName,
376                 Func:        f,
377                 PrivateData: privData,
378         }
379
380         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
381         xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
382         return c.id, nil
383 }
384
385 // EventOff Un-register a (or all) callbacks associated to an event
386 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
387         xs.sockEventsLock.Lock()
388         defer xs.sockEventsLock.Unlock()
389         if _, exist := xs.sockEvents[evName]; exist {
390                 if id == uuid.Nil {
391                         // Un-register all
392                         xs.sockEvents[evName] = []*caller{}
393                 } else {
394                         // Un-register only the specified callback
395                         for i, ff := range xs.sockEvents[evName] {
396                                 if uuid.Equal(ff.id, id) {
397                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
398                                         break
399                                 }
400                         }
401                 }
402         }
403         xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
404         return nil
405 }
406
407 // ProjectToFolder Convert Project structure to Folder structure
408 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
409         stID := ""
410         if pPrj.Type == xsapiv1.TypeCloudSync {
411                 stID, _ = xs.SThg.IDGet()
412         }
413         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
414         fPrj := xsapiv1.FolderConfig{
415                 ID:         pPrj.ID,
416                 Label:      pPrj.Label,
417                 ClientPath: pPrj.ClientPath,
418                 Type:       xsapiv1.FolderType(pPrj.Type),
419                 Status:     pPrj.Status,
420                 IsInSync:   pPrj.IsInSync,
421                 DefaultSdk: pPrj.DefaultSdk,
422                 ClientData: pPrj.ClientData,
423                 DataPathMap: xsapiv1.PathMapConfig{
424                         ServerPath: pPrj.ServerPath,
425                 },
426                 DataCloudSync: xsapiv1.CloudSyncConfig{
427                         SyncThingID:   stID,
428                         STLocIsInSync: pPrj.IsInSync,
429                         STLocStatus:   pPrj.Status,
430                         STSvrIsInSync: pPrj.IsInSync,
431                         STSvrStatus:   pPrj.Status,
432                 },
433         }
434
435         return &fPrj
436 }
437
438 // FolderToProject Convert Folder structure to Project structure
439 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
440         inSync := fPrj.IsInSync
441         sts := fPrj.Status
442
443         if fPrj.Type == xsapiv1.TypeCloudSync {
444                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
445
446                 sts = fPrj.DataCloudSync.STSvrStatus
447                 switch fPrj.DataCloudSync.STLocStatus {
448                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
449                         sts = fPrj.DataCloudSync.STLocStatus
450                         break
451                 case xaapiv1.StatusSyncing:
452                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
453                                 sts = xaapiv1.StatusSyncing
454                         }
455                         break
456                 case xaapiv1.StatusEnable:
457                         // keep STSvrStatus
458                         break
459                 }
460         }
461
462         pPrj := xaapiv1.ProjectConfig{
463                 ID:         fPrj.ID,
464                 ServerID:   xs.ID,
465                 Label:      fPrj.Label,
466                 ClientPath: fPrj.ClientPath,
467                 ServerPath: fPrj.DataPathMap.ServerPath,
468                 Type:       xaapiv1.ProjectType(fPrj.Type),
469                 Status:     sts,
470                 IsInSync:   inSync,
471                 DefaultSdk: fPrj.DefaultSdk,
472                 ClientData: fPrj.ClientData,
473         }
474         return pPrj
475 }
476
477 // CommandAdd Add a new command to the list of running commands
478 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
479         if xs.CommandGet(cmdID) != nil {
480                 return fmt.Errorf("command id already exist")
481         }
482         xs.cmdList[cmdID] = data
483         return nil
484 }
485
486 // CommandDelete Delete a command from the command list
487 func (xs *XdsServer) CommandDelete(cmdID string) error {
488         if xs.CommandGet(cmdID) == nil {
489                 return fmt.Errorf("unknown command id")
490         }
491         delete(xs.cmdList, cmdID)
492         return nil
493 }
494
495 // CommandGet Retrieve a command data
496 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
497         d, exist := xs.cmdList[cmdID]
498         if exist {
499                 return d
500         }
501         return nil
502 }
503
504 /***
505 ** Private functions
506 ***/
507
508 // Create HTTP client
509 func (xs *XdsServer) _CreateConnectHTTP() error {
510         var err error
511         xs.client, err = common.HTTPNewClient(xs.BaseURL,
512                 common.HTTPClientConfig{
513                         URLPrefix:           "/api/v1",
514                         HeaderClientKeyName: "Xds-Sid",
515                         CsrfDisable:         true,
516                         LogOut:              xs.logOut,
517                         LogPrefix:           "XDSSERVER: ",
518                         LogLevel:            common.HTTPLogLevelWarning,
519                 })
520
521         xs.client.SetLogLevel(xs.Log.Level.String())
522
523         if err != nil {
524                 msg := ": " + err.Error()
525                 if strings.Contains(err.Error(), "connection refused") {
526                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
527                 }
528                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
529         }
530         if xs.client == nil {
531                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
532         }
533
534         return nil
535 }
536
537 // _Reconnect Re-established connection
538 func (xs *XdsServer) _Reconnect() error {
539         err := xs._Connect(true)
540         if err == nil {
541                 // Reload projects list for this server
542                 err = xs.projects.Init(xs)
543         }
544         if err == nil {
545                 // Register again to all events
546                 err = xs.EventRegister(xsapiv1.EVTAll, "")
547         }
548         return err
549 }
550
551 // _Connect Established HTTP and WS connection and retrieve XDSServer config
552 func (xs *XdsServer) _Connect(reConn bool) error {
553
554         xdsCfg := xsapiv1.APIConfig{}
555         if err := xs.client.Get("/config", &xdsCfg); err != nil {
556                 xs.Connected = false
557                 if !reConn {
558                         xs._NotifyState()
559                 }
560                 return err
561         }
562
563         if reConn && xs.ID != xdsCfg.ServerUID {
564                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
565         }
566
567         // Update local XDS config
568         xs.ID = xdsCfg.ServerUID
569         xs.ServerConfig = &xdsCfg
570
571         // Establish WS connection and register listen
572         if err := xs._SocketConnect(); err != nil {
573                 xs._Disconnected()
574                 return err
575         }
576
577         xs.Connected = true
578         xs._NotifyState()
579         return nil
580 }
581
582 // _SocketConnect Create WebSocket (io.socket) connection
583 func (xs *XdsServer) _SocketConnect() error {
584
585         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
586
587         opts := &sio_client.Options{
588                 Transport: "websocket",
589                 Header:    make(map[string][]string),
590         }
591         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
592
593         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
594         if err != nil {
595                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
596         }
597         xs.ioSock = iosk
598
599         // Register some listeners
600
601         iosk.On("error", func(err error) {
602                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
603                 if xs.CBOnError != nil {
604                         xs.CBOnError(err)
605                 }
606         })
607
608         iosk.On("disconnection", func(err error) {
609                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
610                 if xs.CBOnDisconnect != nil {
611                         xs.CBOnDisconnect(err)
612                 }
613                 xs._Disconnected()
614
615                 // Try to reconnect during 15min (or at least while not disabled)
616                 go func() {
617                         count := 0
618                         waitTime := 1
619                         for !xs.Disabled && !xs.Connected {
620                                 count++
621                                 if count%60 == 0 {
622                                         waitTime *= 5
623                                 }
624                                 if waitTime > 15*60 {
625                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
626                                         return
627                                 }
628                                 time.Sleep(time.Second * time.Duration(waitTime))
629                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
630
631                                 err := xs._Reconnect()
632                                 if err != nil &&
633                                         !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
634                                         xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
635                                 }
636
637                         }
638                 }()
639         })
640
641         // XXX - There is no connection event generated so, just consider that
642         // we are connected when NewClient return successfully
643         /* iosk.On("connection", func() { ... }) */
644         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
645
646         return nil
647 }
648
649 // _Disconnected Set XDS Server as disconnected
650 func (xs *XdsServer) _Disconnected() error {
651         // Clear all register events as socket is closed
652         for k := range xs.sockEvents {
653                 delete(xs.sockEvents, k)
654         }
655         xs.Connected = false
656         xs.ioSock = nil
657         xs._NotifyState()
658         return nil
659 }
660
661 // _NotifyState Send event to notify changes
662 func (xs *XdsServer) _NotifyState() {
663
664         evSts := xaapiv1.ServerCfg{
665                 ID:         xs.ID,
666                 URL:        xs.BaseURL,
667                 APIURL:     xs.APIURL,
668                 PartialURL: xs.PartialURL,
669                 ConnRetry:  xs.ConnRetry,
670                 Connected:  xs.Connected,
671         }
672         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
673                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
674         }
675 }