7020ef02663f83982e8b736df6da24fec62e3993
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 /*
2  * Copyright (C) 2017 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package agent
19
20 import (
21         "encoding/json"
22         "fmt"
23         "io"
24         "io/ioutil"
25         "net/http"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gin-gonic/gin"
31         "github.com/iotbzh/xds-agent/lib/xaapiv1"
32         "github.com/iotbzh/xds-agent/lib/xdsconfig"
33         common "github.com/iotbzh/xds-common/golib"
34         "github.com/iotbzh/xds-server/lib/xsapiv1"
35         uuid "github.com/satori/go.uuid"
36         sio_client "github.com/sebd71/go-socket.io-client"
37 )
38
39 // XdsServer .
40 type XdsServer struct {
41         *Context
42         ID           string
43         BaseURL      string
44         APIURL       string
45         PartialURL   string
46         ConnRetry    int
47         Connected    bool
48         Disabled     bool
49         ServerConfig *xsapiv1.APIConfig
50
51         // Events management
52         CBOnError      func(error)
53         CBOnDisconnect func(error)
54         sockEvents     map[string][]*caller
55         sockEventsLock *sync.Mutex
56
57         // Private fields
58         client    *common.HTTPClient
59         ioSock    *sio_client.Client
60         logOut    io.Writer
61         apiRouter *gin.RouterGroup
62         cmdList   map[string]interface{}
63 }
64
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
67
68 // caller Used to chain event listeners
69 type caller struct {
70         id          uuid.UUID
71         EventName   string
72         Func        EventCB
73         PrivateData interface{}
74 }
75
76 const _IDTempoPrefix = "tempo-"
77
78 // NewXdsServer creates an instance of XdsServer
79 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
80         return &XdsServer{
81                 Context:    ctx,
82                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
83                 BaseURL:    conf.URL,
84                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
85                 PartialURL: conf.APIPartialURL,
86                 ConnRetry:  conf.ConnRetry,
87                 Connected:  false,
88                 Disabled:   false,
89
90                 sockEvents:     make(map[string][]*caller),
91                 sockEventsLock: &sync.Mutex{},
92                 logOut:         ctx.Log.Out,
93                 cmdList:        make(map[string]interface{}),
94         }
95 }
96
97 // Close Free and close XDS Server connection
98 func (xs *XdsServer) Close() error {
99         err := xs._Disconnected()
100         xs.Disabled = true
101         return err
102 }
103
104 // Connect Establish HTTP connection with XDS Server
105 func (xs *XdsServer) Connect() error {
106         var err error
107         var retry int
108
109         xs.Disabled = false
110         xs.Connected = false
111
112         err = nil
113         for retry = xs.ConnRetry; retry > 0; retry-- {
114                 if err = xs._CreateConnectHTTP(); err == nil {
115                         break
116                 }
117                 if retry == xs.ConnRetry {
118                         // Notify only on the first conn error
119                         // doing that avoid 2 notifs (conn false; conn true) on startup
120                         xs._NotifyState()
121                 }
122                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
123                 time.Sleep(time.Second)
124         }
125         if retry == 0 {
126                 // FIXME: re-use _Reconnect to wait longer in background
127                 return fmt.Errorf("Connection to XDS Server failure")
128         }
129         if err != nil {
130                 return err
131         }
132
133         // Check HTTP connection and establish WS connection
134         err = xs._Connect(false)
135
136         return err
137 }
138
139 // IsTempoID returns true when server as a temporary id
140 func (xs *XdsServer) IsTempoID() bool {
141         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
142 }
143
144 // SetLoggerOutput Set logger ou
145 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
146         xs.logOut = out
147 }
148
149 // SendCommand Send a command to XDS Server
150 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
151         url := cmd
152         if !strings.HasPrefix("/", cmd) {
153                 url = "/" + cmd
154         }
155         return xs.client.Post(url, string(body), res)
156 }
157
158 // GetVersion Send Get request to retrieve XDS Server version
159 func (xs *XdsServer) GetVersion(res interface{}) error {
160         return xs.client.Get("/version", &res)
161 }
162
163 // GetFolders Send GET request to get current folder configuration
164 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
165         return xs.client.Get("/folders", folders)
166 }
167
168 // FolderAdd Send POST request to add a folder
169 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
170         err := xs.client.Post("/folders", fld, res)
171         if err != nil {
172                 return fmt.Errorf("FolderAdd error: %s", err.Error())
173         }
174         return err
175 }
176
177 // FolderDelete Send DELETE request to delete a folder
178 func (xs *XdsServer) FolderDelete(id string) error {
179         return xs.client.HTTPDelete("/folders/" + id)
180 }
181
182 // FolderSync Send POST request to force synchronization of a folder
183 func (xs *XdsServer) FolderSync(id string) error {
184         return xs.client.HTTPPost("/folders/sync/"+id, "")
185 }
186
187 // FolderUpdate Send PUT request to update a folder
188 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
189         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
190 }
191
192 // CommandExec Send POST request to execute a command
193 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
194         return xs.client.Post("/exec", args, res)
195 }
196
197 // CommandSignal Send POST request to send a signal to a command
198 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
199         return xs.client.Post("/signal", args, res)
200 }
201
202 // SetAPIRouterGroup .
203 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
204         xs.apiRouter = r
205 }
206
207 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
208 func (xs *XdsServer) PassthroughGet(url string) {
209         if xs.apiRouter == nil {
210                 xs.Log.Errorf("apiRouter not set !")
211                 return
212         }
213
214         xs.apiRouter.GET(url, func(c *gin.Context) {
215                 var data interface{}
216                 // Take care of param (eg. id in /projects/:id)
217                 nURL := url
218                 if strings.Contains(url, ":") {
219                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
220                 }
221                 // Send Get request
222                 if err := xs.client.Get(nURL, &data); err != nil {
223                         if strings.Contains(err.Error(), "connection refused") {
224                                 xs._Disconnected()
225                         }
226                         common.APIError(c, err.Error())
227                         return
228                 }
229
230                 c.JSON(http.StatusOK, data)
231         })
232 }
233
234 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
235 func (xs *XdsServer) PassthroughPost(url string) {
236         if xs.apiRouter == nil {
237                 xs.Log.Errorf("apiRouter not set !")
238                 return
239         }
240
241         xs.apiRouter.POST(url, func(c *gin.Context) {
242                 bodyReq := []byte{}
243                 n, err := c.Request.Body.Read(bodyReq)
244                 if err != nil {
245                         common.APIError(c, err.Error())
246                         return
247                 }
248
249                 // Take care of param (eg. id in /projects/:id)
250                 nURL := url
251                 if strings.Contains(url, ":") {
252                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
253                 }
254
255                 // Send Post request
256                 body, err := json.Marshal(bodyReq[:n])
257                 if err != nil {
258                         common.APIError(c, err.Error())
259                         return
260                 }
261
262                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
263                 if err != nil {
264                         common.APIError(c, err.Error())
265                         return
266                 }
267
268                 bodyRes, err := ioutil.ReadAll(response.Body)
269                 if err != nil {
270                         common.APIError(c, "Cannot read response body")
271                         return
272                 }
273                 c.JSON(http.StatusOK, string(bodyRes))
274         })
275 }
276
277 // EventRegister Post a request to register to an XdsServer event
278 func (xs *XdsServer) EventRegister(evName string, id string) error {
279         return xs.client.Post(
280                 "/events/register",
281                 xsapiv1.EventRegisterArgs{
282                         Name:      evName,
283                         ProjectID: id,
284                 },
285                 nil)
286 }
287
288 // EventEmit Emit a event to XDS Server through WS
289 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
290         if xs.ioSock == nil {
291                 return fmt.Errorf("Io.Socket not initialized")
292         }
293
294         return xs.ioSock.Emit(message, args...)
295 }
296
297 // EventOn Register a callback on events reception
298 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
299         if xs.ioSock == nil {
300                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
301         }
302
303         xs.sockEventsLock.Lock()
304         defer xs.sockEventsLock.Unlock()
305
306         if _, exist := xs.sockEvents[evName]; !exist {
307                 // Register listener only the first time
308                 evn := evName
309
310                 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
311                 var err error
312                 if evName == "event:folder-state-change" {
313                         err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
314                                 xs.sockEventsLock.Lock()
315                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
316                                 copy(sEvts, xs.sockEvents[evn])
317                                 xs.sockEventsLock.Unlock()
318                                 for _, c := range sEvts {
319                                         c.Func(c.PrivateData, data)
320                                 }
321                                 return nil
322                         })
323                 } else {
324                         err = xs.ioSock.On(evn, func(data interface{}) error {
325                                 xs.sockEventsLock.Lock()
326                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
327                                 copy(sEvts, xs.sockEvents[evn])
328                                 xs.sockEventsLock.Unlock()
329                                 for _, c := range sEvts {
330                                         c.Func(c.PrivateData, data)
331                                 }
332                                 return nil
333                         })
334                 }
335                 if err != nil {
336                         return uuid.Nil, err
337                 }
338         }
339
340         c := &caller{
341                 id:          uuid.NewV1(),
342                 EventName:   evName,
343                 Func:        f,
344                 PrivateData: privData,
345         }
346
347         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
348         xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
349         return c.id, nil
350 }
351
352 // EventOff Un-register a (or all) callbacks associated to an event
353 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
354         xs.sockEventsLock.Lock()
355         defer xs.sockEventsLock.Unlock()
356         if _, exist := xs.sockEvents[evName]; exist {
357                 if id == uuid.Nil {
358                         // Un-register all
359                         xs.sockEvents[evName] = []*caller{}
360                 } else {
361                         // Un-register only the specified callback
362                         for i, ff := range xs.sockEvents[evName] {
363                                 if uuid.Equal(ff.id, id) {
364                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
365                                         break
366                                 }
367                         }
368                 }
369         }
370         xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
371         return nil
372 }
373
374 // ProjectToFolder Convert Project structure to Folder structure
375 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
376         stID := ""
377         if pPrj.Type == xsapiv1.TypeCloudSync {
378                 stID, _ = xs.SThg.IDGet()
379         }
380         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
381         fPrj := xsapiv1.FolderConfig{
382                 ID:         pPrj.ID,
383                 Label:      pPrj.Label,
384                 ClientPath: pPrj.ClientPath,
385                 Type:       xsapiv1.FolderType(pPrj.Type),
386                 Status:     pPrj.Status,
387                 IsInSync:   pPrj.IsInSync,
388                 DefaultSdk: pPrj.DefaultSdk,
389                 ClientData: pPrj.ClientData,
390                 DataPathMap: xsapiv1.PathMapConfig{
391                         ServerPath: pPrj.ServerPath,
392                 },
393                 DataCloudSync: xsapiv1.CloudSyncConfig{
394                         SyncThingID:   stID,
395                         STLocIsInSync: pPrj.IsInSync,
396                         STLocStatus:   pPrj.Status,
397                         STSvrIsInSync: pPrj.IsInSync,
398                         STSvrStatus:   pPrj.Status,
399                 },
400         }
401
402         return &fPrj
403 }
404
405 // FolderToProject Convert Folder structure to Project structure
406 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
407         inSync := fPrj.IsInSync
408         sts := fPrj.Status
409
410         if fPrj.Type == xsapiv1.TypeCloudSync {
411                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
412
413                 sts = fPrj.DataCloudSync.STSvrStatus
414                 switch fPrj.DataCloudSync.STLocStatus {
415                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
416                         sts = fPrj.DataCloudSync.STLocStatus
417                         break
418                 case xaapiv1.StatusSyncing:
419                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
420                                 sts = xaapiv1.StatusSyncing
421                         }
422                         break
423                 case xaapiv1.StatusEnable:
424                         // keep STSvrStatus
425                         break
426                 }
427         }
428
429         pPrj := xaapiv1.ProjectConfig{
430                 ID:         fPrj.ID,
431                 ServerID:   xs.ID,
432                 Label:      fPrj.Label,
433                 ClientPath: fPrj.ClientPath,
434                 ServerPath: fPrj.DataPathMap.ServerPath,
435                 Type:       xaapiv1.ProjectType(fPrj.Type),
436                 Status:     sts,
437                 IsInSync:   inSync,
438                 DefaultSdk: fPrj.DefaultSdk,
439                 ClientData: fPrj.ClientData,
440         }
441         return pPrj
442 }
443
444 // CommandAdd Add a new command to the list of running commands
445 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
446         if xs.CommandGet(cmdID) != nil {
447                 return fmt.Errorf("command id already exist")
448         }
449         xs.cmdList[cmdID] = data
450         return nil
451 }
452
453 // CommandDelete Delete a command from the command list
454 func (xs *XdsServer) CommandDelete(cmdID string) error {
455         if xs.CommandGet(cmdID) == nil {
456                 return fmt.Errorf("unknown command id")
457         }
458         delete(xs.cmdList, cmdID)
459         return nil
460 }
461
462 // CommandGet Retrieve a command data
463 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
464         d, exist := xs.cmdList[cmdID]
465         if exist {
466                 return d
467         }
468         return nil
469 }
470
471 /***
472 ** Private functions
473 ***/
474
475 // Create HTTP client
476 func (xs *XdsServer) _CreateConnectHTTP() error {
477         var err error
478         xs.client, err = common.HTTPNewClient(xs.BaseURL,
479                 common.HTTPClientConfig{
480                         URLPrefix:           "/api/v1",
481                         HeaderClientKeyName: "Xds-Sid",
482                         CsrfDisable:         true,
483                         LogOut:              xs.logOut,
484                         LogPrefix:           "XDSSERVER: ",
485                         LogLevel:            common.HTTPLogLevelWarning,
486                 })
487
488         xs.client.SetLogLevel(xs.Log.Level.String())
489
490         if err != nil {
491                 msg := ": " + err.Error()
492                 if strings.Contains(err.Error(), "connection refused") {
493                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
494                 }
495                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
496         }
497         if xs.client == nil {
498                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
499         }
500
501         return nil
502 }
503
504 // _Reconnect Re-established connection
505 func (xs *XdsServer) _Reconnect() error {
506         err := xs._Connect(true)
507         if err == nil {
508                 // Reload projects list for this server
509                 err = xs.projects.Init(xs)
510         }
511         return err
512 }
513
514 // _Connect Established HTTP and WS connection and retrieve XDSServer config
515 func (xs *XdsServer) _Connect(reConn bool) error {
516
517         xdsCfg := xsapiv1.APIConfig{}
518         if err := xs.client.Get("/config", &xdsCfg); err != nil {
519                 xs.Connected = false
520                 if !reConn {
521                         xs._NotifyState()
522                 }
523                 return err
524         }
525
526         if reConn && xs.ID != xdsCfg.ServerUID {
527                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
528         }
529
530         // Update local XDS config
531         xs.ID = xdsCfg.ServerUID
532         xs.ServerConfig = &xdsCfg
533
534         // Establish WS connection and register listen
535         if err := xs._SocketConnect(); err != nil {
536                 xs._Disconnected()
537                 return err
538         }
539
540         xs.Connected = true
541         xs._NotifyState()
542         return nil
543 }
544
545 // _SocketConnect Create WebSocket (io.socket) connection
546 func (xs *XdsServer) _SocketConnect() error {
547
548         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
549
550         opts := &sio_client.Options{
551                 Transport: "websocket",
552                 Header:    make(map[string][]string),
553         }
554         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
555
556         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
557         if err != nil {
558                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
559         }
560         xs.ioSock = iosk
561
562         // Register some listeners
563
564         iosk.On("error", func(err error) {
565                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
566                 if xs.CBOnError != nil {
567                         xs.CBOnError(err)
568                 }
569         })
570
571         iosk.On("disconnection", func(err error) {
572                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
573                 if xs.CBOnDisconnect != nil {
574                         xs.CBOnDisconnect(err)
575                 }
576                 xs._Disconnected()
577
578                 // Try to reconnect during 15min (or at least while not disabled)
579                 go func() {
580                         count := 0
581                         waitTime := 1
582                         for !xs.Disabled && !xs.Connected {
583                                 count++
584                                 if count%60 == 0 {
585                                         waitTime *= 5
586                                 }
587                                 if waitTime > 15*60 {
588                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
589                                         return
590                                 }
591                                 time.Sleep(time.Second * time.Duration(waitTime))
592                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
593
594                                 err := xs._Reconnect()
595                                 if err != nil &&
596                                         !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
597                                         xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
598                                 }
599
600                         }
601                 }()
602         })
603
604         // XXX - There is no connection event generated so, just consider that
605         // we are connected when NewClient return successfully
606         /* iosk.On("connection", func() { ... }) */
607         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
608
609         return nil
610 }
611
612 // _Disconnected Set XDS Server as disconnected
613 func (xs *XdsServer) _Disconnected() error {
614         // Clear all register events as socket is closed
615         for k := range xs.sockEvents {
616                 delete(xs.sockEvents, k)
617         }
618         xs.Connected = false
619         xs.ioSock = nil
620         xs._NotifyState()
621         return nil
622 }
623
624 // _NotifyState Send event to notify changes
625 func (xs *XdsServer) _NotifyState() {
626
627         evSts := xaapiv1.ServerCfg{
628                 ID:         xs.ID,
629                 URL:        xs.BaseURL,
630                 APIURL:     xs.APIURL,
631                 PartialURL: xs.PartialURL,
632                 ConnRetry:  xs.ConnRetry,
633                 Connected:  xs.Connected,
634         }
635         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
636                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
637         }
638 }