Added copyright headers
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 /*
2  * Copyright (C) 2017 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package agent
19
20 import (
21         "encoding/json"
22         "fmt"
23         "io"
24         "io/ioutil"
25         "net/http"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gin-gonic/gin"
31         "github.com/iotbzh/xds-agent/lib/xaapiv1"
32         "github.com/iotbzh/xds-agent/lib/xdsconfig"
33         common "github.com/iotbzh/xds-common/golib"
34         "github.com/iotbzh/xds-server/lib/xsapiv1"
35         uuid "github.com/satori/go.uuid"
36         sio_client "github.com/sebd71/go-socket.io-client"
37 )
38
39 // XdsServer .
40 type XdsServer struct {
41         *Context
42         ID           string
43         BaseURL      string
44         APIURL       string
45         PartialURL   string
46         ConnRetry    int
47         Connected    bool
48         Disabled     bool
49         ServerConfig *xsapiv1.APIConfig
50
51         // Events management
52         CBOnError      func(error)
53         CBOnDisconnect func(error)
54         sockEvents     map[string][]*caller
55         sockEventsLock *sync.Mutex
56
57         // Private fields
58         client    *common.HTTPClient
59         ioSock    *sio_client.Client
60         logOut    io.Writer
61         apiRouter *gin.RouterGroup
62         cmdList   map[string]interface{}
63 }
64
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
67
68 // caller Used to chain event listeners
69 type caller struct {
70         id          uuid.UUID
71         EventName   string
72         Func        EventCB
73         PrivateData interface{}
74 }
75
76 const _IDTempoPrefix = "tempo-"
77
78 // NewXdsServer creates an instance of XdsServer
79 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
80         return &XdsServer{
81                 Context:    ctx,
82                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
83                 BaseURL:    conf.URL,
84                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
85                 PartialURL: conf.APIPartialURL,
86                 ConnRetry:  conf.ConnRetry,
87                 Connected:  false,
88                 Disabled:   false,
89
90                 sockEvents:     make(map[string][]*caller),
91                 sockEventsLock: &sync.Mutex{},
92                 logOut:         ctx.Log.Out,
93                 cmdList:        make(map[string]interface{}),
94         }
95 }
96
97 // Close Free and close XDS Server connection
98 func (xs *XdsServer) Close() error {
99         xs.Connected = false
100         xs.Disabled = true
101         xs.ioSock = nil
102         xs._NotifyState()
103         return nil
104 }
105
106 // Connect Establish HTTP connection with XDS Server
107 func (xs *XdsServer) Connect() error {
108         var err error
109         var retry int
110
111         xs.Disabled = false
112         xs.Connected = false
113
114         err = nil
115         for retry = xs.ConnRetry; retry > 0; retry-- {
116                 if err = xs._CreateConnectHTTP(); err == nil {
117                         break
118                 }
119                 if retry == xs.ConnRetry {
120                         // Notify only on the first conn error
121                         // doing that avoid 2 notifs (conn false; conn true) on startup
122                         xs._NotifyState()
123                 }
124                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
125                 time.Sleep(time.Second)
126         }
127         if retry == 0 {
128                 // FIXME: re-use _reconnect to wait longer in background
129                 return fmt.Errorf("Connection to XDS Server failure")
130         }
131         if err != nil {
132                 return err
133         }
134
135         // Check HTTP connection and establish WS connection
136         err = xs._connect(false)
137
138         return err
139 }
140
141 // IsTempoID returns true when server as a temporary id
142 func (xs *XdsServer) IsTempoID() bool {
143         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
144 }
145
146 // SetLoggerOutput Set logger ou
147 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
148         xs.logOut = out
149 }
150
151 // SendCommand Send a command to XDS Server
152 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
153         url := cmd
154         if !strings.HasPrefix("/", cmd) {
155                 url = "/" + cmd
156         }
157         return xs.client.Post(url, string(body), res)
158 }
159
160 // GetVersion Send Get request to retrieve XDS Server version
161 func (xs *XdsServer) GetVersion(res interface{}) error {
162         return xs.client.Get("/version", &res)
163 }
164
165 // GetFolders Send GET request to get current folder configuration
166 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
167         return xs.client.Get("/folders", folders)
168 }
169
170 // FolderAdd Send POST request to add a folder
171 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
172         err := xs.client.Post("/folders", fld, res)
173         if err != nil {
174                 return fmt.Errorf("FolderAdd error: %s", err.Error())
175         }
176         return err
177 }
178
179 // FolderDelete Send DELETE request to delete a folder
180 func (xs *XdsServer) FolderDelete(id string) error {
181         return xs.client.HTTPDelete("/folders/" + id)
182 }
183
184 // FolderSync Send POST request to force synchronization of a folder
185 func (xs *XdsServer) FolderSync(id string) error {
186         return xs.client.HTTPPost("/folders/sync/"+id, "")
187 }
188
189 // FolderUpdate Send PUT request to update a folder
190 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
191         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
192 }
193
194 // CommandExec Send POST request to execute a command
195 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
196         return xs.client.Post("/exec", args, res)
197 }
198
199 // CommandSignal Send POST request to send a signal to a command
200 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
201         return xs.client.Post("/signal", args, res)
202 }
203
204 // SetAPIRouterGroup .
205 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
206         xs.apiRouter = r
207 }
208
209 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
210 func (xs *XdsServer) PassthroughGet(url string) {
211         if xs.apiRouter == nil {
212                 xs.Log.Errorf("apiRouter not set !")
213                 return
214         }
215
216         xs.apiRouter.GET(url, func(c *gin.Context) {
217                 var data interface{}
218                 // Take care of param (eg. id in /projects/:id)
219                 nURL := url
220                 if strings.Contains(url, ":") {
221                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
222                 }
223                 // Send Get request
224                 if err := xs.client.Get(nURL, &data); err != nil {
225                         if strings.Contains(err.Error(), "connection refused") {
226                                 xs.Connected = false
227                                 xs._NotifyState()
228                         }
229                         common.APIError(c, err.Error())
230                         return
231                 }
232
233                 c.JSON(http.StatusOK, data)
234         })
235 }
236
237 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
238 func (xs *XdsServer) PassthroughPost(url string) {
239         if xs.apiRouter == nil {
240                 xs.Log.Errorf("apiRouter not set !")
241                 return
242         }
243
244         xs.apiRouter.POST(url, func(c *gin.Context) {
245                 bodyReq := []byte{}
246                 n, err := c.Request.Body.Read(bodyReq)
247                 if err != nil {
248                         common.APIError(c, err.Error())
249                         return
250                 }
251
252                 // Take care of param (eg. id in /projects/:id)
253                 nURL := url
254                 if strings.Contains(url, ":") {
255                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
256                 }
257
258                 // Send Post request
259                 body, err := json.Marshal(bodyReq[:n])
260                 if err != nil {
261                         common.APIError(c, err.Error())
262                         return
263                 }
264
265                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
266                 if err != nil {
267                         common.APIError(c, err.Error())
268                         return
269                 }
270
271                 bodyRes, err := ioutil.ReadAll(response.Body)
272                 if err != nil {
273                         common.APIError(c, "Cannot read response body")
274                         return
275                 }
276                 c.JSON(http.StatusOK, string(bodyRes))
277         })
278 }
279
280 // EventRegister Post a request to register to an XdsServer event
281 func (xs *XdsServer) EventRegister(evName string, id string) error {
282         return xs.client.Post(
283                 "/events/register",
284                 xsapiv1.EventRegisterArgs{
285                         Name:      evName,
286                         ProjectID: id,
287                 },
288                 nil)
289 }
290
291 // EventEmit Emit a event to XDS Server through WS
292 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
293         if xs.ioSock == nil {
294                 return fmt.Errorf("Io.Socket not initialized")
295         }
296
297         return xs.ioSock.Emit(message, args...)
298 }
299
300 // EventOn Register a callback on events reception
301 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
302         if xs.ioSock == nil {
303                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
304         }
305
306         xs.sockEventsLock.Lock()
307         defer xs.sockEventsLock.Unlock()
308
309         if _, exist := xs.sockEvents[evName]; !exist {
310                 // Register listener only the first time
311                 evn := evName
312
313                 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
314                 var err error
315                 if evName == "event:folder-state-change" {
316                         err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
317                                 xs.sockEventsLock.Lock()
318                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
319                                 copy(sEvts, xs.sockEvents[evn])
320                                 xs.sockEventsLock.Unlock()
321                                 for _, c := range sEvts {
322                                         c.Func(c.PrivateData, data)
323                                 }
324                                 return nil
325                         })
326                 } else {
327                         err = xs.ioSock.On(evn, func(data interface{}) error {
328                                 xs.sockEventsLock.Lock()
329                                 sEvts := make([]*caller, len(xs.sockEvents[evn]))
330                                 copy(sEvts, xs.sockEvents[evn])
331                                 xs.sockEventsLock.Unlock()
332                                 for _, c := range sEvts {
333                                         c.Func(c.PrivateData, data)
334                                 }
335                                 return nil
336                         })
337                 }
338                 if err != nil {
339                         return uuid.Nil, err
340                 }
341         }
342
343         c := &caller{
344                 id:          uuid.NewV1(),
345                 EventName:   evName,
346                 Func:        f,
347                 PrivateData: privData,
348         }
349
350         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
351         return c.id, nil
352 }
353
354 // EventOff Un-register a (or all) callbacks associated to an event
355 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
356         xs.sockEventsLock.Lock()
357         defer xs.sockEventsLock.Unlock()
358         if _, exist := xs.sockEvents[evName]; exist {
359                 if id == uuid.Nil {
360                         // Un-register all
361                         xs.sockEvents[evName] = []*caller{}
362                 } else {
363                         // Un-register only the specified callback
364                         for i, ff := range xs.sockEvents[evName] {
365                                 if uuid.Equal(ff.id, id) {
366                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
367                                         break
368                                 }
369                         }
370                 }
371         }
372         return nil
373 }
374
375 // ProjectToFolder Convert Project structure to Folder structure
376 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
377         stID := ""
378         if pPrj.Type == xsapiv1.TypeCloudSync {
379                 stID, _ = xs.SThg.IDGet()
380         }
381         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
382         fPrj := xsapiv1.FolderConfig{
383                 ID:         pPrj.ID,
384                 Label:      pPrj.Label,
385                 ClientPath: pPrj.ClientPath,
386                 Type:       xsapiv1.FolderType(pPrj.Type),
387                 Status:     pPrj.Status,
388                 IsInSync:   pPrj.IsInSync,
389                 DefaultSdk: pPrj.DefaultSdk,
390                 ClientData: pPrj.ClientData,
391                 DataPathMap: xsapiv1.PathMapConfig{
392                         ServerPath: pPrj.ServerPath,
393                 },
394                 DataCloudSync: xsapiv1.CloudSyncConfig{
395                         SyncThingID:   stID,
396                         STLocIsInSync: pPrj.IsInSync,
397                         STLocStatus:   pPrj.Status,
398                         STSvrIsInSync: pPrj.IsInSync,
399                         STSvrStatus:   pPrj.Status,
400                 },
401         }
402
403         return &fPrj
404 }
405
406 // FolderToProject Convert Folder structure to Project structure
407 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
408         inSync := fPrj.IsInSync
409         sts := fPrj.Status
410
411         if fPrj.Type == xsapiv1.TypeCloudSync {
412                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
413
414                 sts = fPrj.DataCloudSync.STSvrStatus
415                 switch fPrj.DataCloudSync.STLocStatus {
416                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
417                         sts = fPrj.DataCloudSync.STLocStatus
418                         break
419                 case xaapiv1.StatusSyncing:
420                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
421                                 sts = xaapiv1.StatusSyncing
422                         }
423                         break
424                 case xaapiv1.StatusEnable:
425                         // keep STSvrStatus
426                         break
427                 }
428         }
429
430         pPrj := xaapiv1.ProjectConfig{
431                 ID:         fPrj.ID,
432                 ServerID:   xs.ID,
433                 Label:      fPrj.Label,
434                 ClientPath: fPrj.ClientPath,
435                 ServerPath: fPrj.DataPathMap.ServerPath,
436                 Type:       xaapiv1.ProjectType(fPrj.Type),
437                 Status:     sts,
438                 IsInSync:   inSync,
439                 DefaultSdk: fPrj.DefaultSdk,
440                 ClientData: fPrj.ClientData,
441         }
442         return pPrj
443 }
444
445 // CommandAdd Add a new command to the list of running commands
446 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
447         if xs.CommandGet(cmdID) != nil {
448                 return fmt.Errorf("command id already exist")
449         }
450         xs.cmdList[cmdID] = data
451         return nil
452 }
453
454 // CommandDelete Delete a command from the command list
455 func (xs *XdsServer) CommandDelete(cmdID string) error {
456         if xs.CommandGet(cmdID) == nil {
457                 return fmt.Errorf("unknown command id")
458         }
459         delete(xs.cmdList, cmdID)
460         return nil
461 }
462
463 // CommandGet Retrieve a command data
464 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
465         d, exist := xs.cmdList[cmdID]
466         if exist {
467                 return d
468         }
469         return nil
470 }
471
472 /***
473 ** Private functions
474 ***/
475
476 // Create HTTP client
477 func (xs *XdsServer) _CreateConnectHTTP() error {
478         var err error
479         xs.client, err = common.HTTPNewClient(xs.BaseURL,
480                 common.HTTPClientConfig{
481                         URLPrefix:           "/api/v1",
482                         HeaderClientKeyName: "Xds-Sid",
483                         CsrfDisable:         true,
484                         LogOut:              xs.logOut,
485                         LogPrefix:           "XDSSERVER: ",
486                         LogLevel:            common.HTTPLogLevelWarning,
487                 })
488
489         xs.client.SetLogLevel(xs.Log.Level.String())
490
491         if err != nil {
492                 msg := ": " + err.Error()
493                 if strings.Contains(err.Error(), "connection refused") {
494                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
495                 }
496                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
497         }
498         if xs.client == nil {
499                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
500         }
501
502         return nil
503 }
504
505 //  Re-established connection
506 func (xs *XdsServer) _reconnect() error {
507         err := xs._connect(true)
508         if err == nil {
509                 // Reload projects list for this server
510                 err = xs.projects.Init(xs)
511         }
512         return err
513 }
514
515 //  Established HTTP and WS connection and retrieve XDSServer config
516 func (xs *XdsServer) _connect(reConn bool) error {
517
518         xdsCfg := xsapiv1.APIConfig{}
519         if err := xs.client.Get("/config", &xdsCfg); err != nil {
520                 xs.Connected = false
521                 if !reConn {
522                         xs._NotifyState()
523                 }
524                 return err
525         }
526
527         if reConn && xs.ID != xdsCfg.ServerUID {
528                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
529         }
530
531         // Update local XDS config
532         xs.ID = xdsCfg.ServerUID
533         xs.ServerConfig = &xdsCfg
534
535         // Establish WS connection and register listen
536         if err := xs._SocketConnect(); err != nil {
537                 xs.Connected = false
538                 xs._NotifyState()
539                 return err
540         }
541
542         xs.Connected = true
543         xs._NotifyState()
544         return nil
545 }
546
547 // Create WebSocket (io.socket) connection
548 func (xs *XdsServer) _SocketConnect() error {
549
550         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
551
552         opts := &sio_client.Options{
553                 Transport: "websocket",
554                 Header:    make(map[string][]string),
555         }
556         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
557
558         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
559         if err != nil {
560                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
561         }
562         xs.ioSock = iosk
563
564         // Register some listeners
565
566         iosk.On("error", func(err error) {
567                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
568                 if xs.CBOnError != nil {
569                         xs.CBOnError(err)
570                 }
571         })
572
573         iosk.On("disconnection", func(err error) {
574                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
575                 if xs.CBOnDisconnect != nil {
576                         xs.CBOnDisconnect(err)
577                 }
578                 xs.Connected = false
579                 xs._NotifyState()
580
581                 // Try to reconnect during 15min (or at least while not disabled)
582                 go func() {
583                         count := 0
584                         waitTime := 1
585                         for !xs.Disabled && !xs.Connected {
586                                 count++
587                                 if count%60 == 0 {
588                                         waitTime *= 5
589                                 }
590                                 if waitTime > 15*60 {
591                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
592                                         return
593                                 }
594                                 time.Sleep(time.Second * time.Duration(waitTime))
595                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
596
597                                 xs._reconnect()
598                         }
599                 }()
600         })
601
602         // XXX - There is no connection event generated so, just consider that
603         // we are connected when NewClient return successfully
604         /* iosk.On("connection", func() { ... }) */
605         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
606
607         return nil
608 }
609
610 // Send event to notify changes
611 func (xs *XdsServer) _NotifyState() {
612
613         evSts := xaapiv1.ServerCfg{
614                 ID:         xs.ID,
615                 URL:        xs.BaseURL,
616                 APIURL:     xs.APIURL,
617                 PartialURL: xs.PartialURL,
618                 ConnRetry:  xs.ConnRetry,
619                 Connected:  xs.Connected,
620         }
621         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
622                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
623         }
624 }