518c68be90ced1412d98f28869e8a5cc209440dc
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 package agent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "strings"
10         "time"
11
12         "github.com/gin-gonic/gin"
13         "github.com/iotbzh/xds-agent/lib/xdsconfig"
14         common "github.com/iotbzh/xds-common/golib"
15         uuid "github.com/satori/go.uuid"
16         sio_client "github.com/sebd71/go-socket.io-client"
17 )
18
19 // Server .
20 type XdsServer struct {
21         *Context
22         ID           string
23         BaseURL      string
24         APIURL       string
25         PartialURL   string
26         ConnRetry    int
27         Connected    bool
28         Disabled     bool
29         ServerConfig *xdsServerConfig
30
31         // callbacks
32         CBOnError      func(error)
33         CBOnDisconnect func(error)
34
35         // Private fields
36         client    *common.HTTPClient
37         ioSock    *sio_client.Client
38         logOut    io.Writer
39         apiRouter *gin.RouterGroup
40 }
41
42 // xdsServerConfig Data return by GET /config
43 type xdsServerConfig struct {
44         ID               string           `json:"id"`
45         Version          string           `json:"version"`
46         APIVersion       string           `json:"apiVersion"`
47         VersionGitTag    string           `json:"gitTag"`
48         SupportedSharing map[string]bool  `json:"supportedSharing"`
49         Builder          xdsBuilderConfig `json:"builder"`
50 }
51
52 // xdsBuilderConfig represents the builder container configuration
53 type xdsBuilderConfig struct {
54         IP          string `json:"ip"`
55         Port        string `json:"port"`
56         SyncThingID string `json:"syncThingID"`
57 }
58
59 // FolderType XdsServer folder type
60 type FolderType string
61
62 const (
63         XdsTypePathMap   = "PathMap"
64         XdsTypeCloudSync = "CloudSync"
65         XdsTypeCifsSmb   = "CIFS"
66 )
67
68 // FolderConfig XdsServer folder config
69 type FolderConfig struct {
70         ID         string     `json:"id"`
71         Label      string     `json:"label"`
72         ClientPath string     `json:"path"`
73         Type       FolderType `json:"type"`
74         Status     string     `json:"status"`
75         IsInSync   bool       `json:"isInSync"`
76         DefaultSdk string     `json:"defaultSdk"`
77         // Specific data depending on which Type is used
78         DataPathMap   PathMapConfig   `json:"dataPathMap,omitempty"`
79         DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
80 }
81
82 // PathMapConfig Path mapping specific data
83 type PathMapConfig struct {
84         ServerPath string `json:"serverPath"`
85 }
86
87 // CloudSyncConfig CloudSync (AKA Syncthing) specific data
88 type CloudSyncConfig struct {
89         SyncThingID string `json:"syncThingID"`
90 }
91
92 const _IDTempoPrefix = "tempo-"
93
94 // NewXdsServer creates an instance of XdsServer
95 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
96         return &XdsServer{
97                 Context:    ctx,
98                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
99                 BaseURL:    conf.URL,
100                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
101                 PartialURL: conf.APIPartialURL,
102                 ConnRetry:  conf.ConnRetry,
103                 Connected:  false,
104                 Disabled:   false,
105
106                 logOut: ctx.Log.Out,
107         }
108 }
109
110 // Close Free and close XDS Server connection
111 func (xs *XdsServer) Close() error {
112         xs.Connected = false
113         xs.Disabled = true
114         xs.ioSock = nil
115         xs._NotifyState()
116         return nil
117 }
118
119 // Connect Establish HTTP connection with XDS Server
120 func (xs *XdsServer) Connect() error {
121         var err error
122         var retry int
123
124         xs.Disabled = false
125         xs.Connected = false
126
127         err = nil
128         for retry = xs.ConnRetry; retry > 0; retry-- {
129                 if err = xs._CreateConnectHTTP(); err == nil {
130                         break
131                 }
132                 if retry == xs.ConnRetry {
133                         // Notify only on the first conn error
134                         // doing that avoid 2 notifs (conn false; conn true) on startup
135                         xs._NotifyState()
136                 }
137                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
138                 time.Sleep(time.Second)
139         }
140         if retry == 0 {
141                 // FIXME SEB: re-use _reconnect to wait longer in background
142                 return fmt.Errorf("Connection to XDS Server failure")
143         }
144         if err != nil {
145                 return err
146         }
147
148         // Check HTTP connection and establish WS connection
149         err = xs._connect(false)
150
151         return err
152 }
153
154 // IsTempoID returns true when server as a temporary id
155 func (xs *XdsServer) IsTempoID() bool {
156         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
157 }
158
159 // SetLoggerOutput Set logger ou
160 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
161         xs.logOut = out
162 }
163
164 // FolderAdd Send POST request to add a folder
165 func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
166         response, err := xs.HTTPPost("/folder", prj)
167         if err != nil {
168                 return err
169         }
170         if response.StatusCode != 200 {
171                 return fmt.Errorf("FolderAdd error status=%s", response.Status)
172         }
173         // Result is a FolderConfig that is equivalent to ProjectConfig
174         err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
175
176         return err
177 }
178
179 // FolderDelete Send DELETE request to delete a folder
180 func (xs *XdsServer) FolderDelete(id string) error {
181         return xs.client.HTTPDelete("/folder/" + id)
182 }
183
184 // HTTPGet .
185 func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
186         var dd []byte
187         if err := xs.client.HTTPGet(url, &dd); err != nil {
188                 return err
189         }
190         return json.Unmarshal(dd, &data)
191 }
192
193 // HTTPPost .
194 func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
195         body, err := json.Marshal(data)
196         if err != nil {
197                 return nil, err
198         }
199         return xs.HTTPPostBody(url, string(body))
200 }
201
202 // HTTPPostBody .
203 func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
204         return xs.client.HTTPPostWithRes(url, body)
205 }
206
207 // SetAPIRouterGroup .
208 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
209         xs.apiRouter = r
210 }
211
212 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
213 func (xs *XdsServer) PassthroughGet(url string) {
214         if xs.apiRouter == nil {
215                 xs.Log.Errorf("apiRouter not set !")
216                 return
217         }
218
219         xs.apiRouter.GET(url, func(c *gin.Context) {
220                 var data interface{}
221                 if err := xs.HTTPGet(url, &data); err != nil {
222                         if strings.Contains(err.Error(), "connection refused") {
223                                 xs.Connected = false
224                                 xs._NotifyState()
225                         }
226                         common.APIError(c, err.Error())
227                         return
228                 }
229
230                 c.JSON(http.StatusOK, data)
231         })
232 }
233
234 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
235 func (xs *XdsServer) PassthroughPost(url string) {
236         if xs.apiRouter == nil {
237                 xs.Log.Errorf("apiRouter not set !")
238                 return
239         }
240
241         xs.apiRouter.POST(url, func(c *gin.Context) {
242                 bodyReq := []byte{}
243                 n, err := c.Request.Body.Read(bodyReq)
244                 if err != nil {
245                         common.APIError(c, err.Error())
246                         return
247                 }
248
249                 response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
250                 if err != nil {
251                         common.APIError(c, err.Error())
252                         return
253                 }
254                 bodyRes, err := ioutil.ReadAll(response.Body)
255                 if err != nil {
256                         common.APIError(c, "Cannot read response body")
257                         return
258                 }
259                 c.JSON(http.StatusOK, string(bodyRes))
260         })
261 }
262
263 // EventOn Register a callback on events reception
264 func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
265         if xs.ioSock == nil {
266                 return fmt.Errorf("Io.Socket not initialized")
267         }
268         // FIXME SEB: support chain / multiple listeners
269         /*      sockEvents     map[string][]*caller
270         xs.sockEventsLock.Lock()
271         xs.sockEvents[message] = append(xs.sockEvents[message], f)
272         xs.sockEventsLock.Unlock()
273         xs.ioSock.On(message, func(ev) {
274
275         })
276         */
277         return xs.ioSock.On(message, f)
278 }
279
280 // ProjectToFolder
281 func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
282         stID := ""
283         if pPrj.Type == XdsTypeCloudSync {
284                 stID, _ = xs.SThg.IDGet()
285         }
286         fPrj := FolderConfig{
287                 ID:         pPrj.ID,
288                 Label:      pPrj.Label,
289                 ClientPath: pPrj.ClientPath,
290                 Type:       FolderType(pPrj.Type),
291                 Status:     pPrj.Status,
292                 IsInSync:   pPrj.IsInSync,
293                 DefaultSdk: pPrj.DefaultSdk,
294                 DataPathMap: PathMapConfig{
295                         ServerPath: pPrj.ServerPath,
296                 },
297                 DataCloudSync: CloudSyncConfig{
298                         SyncThingID: stID,
299                 },
300         }
301         return &fPrj
302 }
303
304 // FolderToProject
305 func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
306         pPrj := ProjectConfig{
307                 ID:         fPrj.ID,
308                 ServerID:   xs.ID,
309                 Label:      fPrj.Label,
310                 ClientPath: fPrj.ClientPath,
311                 ServerPath: fPrj.DataPathMap.ServerPath,
312                 Type:       ProjectType(fPrj.Type),
313                 Status:     fPrj.Status,
314                 IsInSync:   fPrj.IsInSync,
315                 DefaultSdk: fPrj.DefaultSdk,
316         }
317         return pPrj
318 }
319
320 /***
321 ** Private functions
322 ***/
323
324 // Create HTTP client
325 func (xs *XdsServer) _CreateConnectHTTP() error {
326         var err error
327         xs.client, err = common.HTTPNewClient(xs.BaseURL,
328                 common.HTTPClientConfig{
329                         URLPrefix:           "/api/v1",
330                         HeaderClientKeyName: "Xds-Sid",
331                         CsrfDisable:         true,
332                         LogOut:              xs.logOut,
333                         LogPrefix:           "XDSSERVER: ",
334                         LogLevel:            common.HTTPLogLevelWarning,
335                 })
336
337         xs.client.SetLogLevel(xs.Log.Level.String())
338
339         if err != nil {
340                 msg := ": " + err.Error()
341                 if strings.Contains(err.Error(), "connection refused") {
342                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
343                 }
344                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
345         }
346         if xs.client == nil {
347                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
348         }
349
350         return nil
351 }
352
353 //  Re-established connection
354 func (xs *XdsServer) _reconnect() error {
355         err := xs._connect(true)
356         if err == nil {
357                 // Reload projects list for this server
358                 err = xs.projects.Init(xs)
359         }
360         return err
361 }
362
363 //  Established HTTP and WS connection and retrieve XDSServer config
364 func (xs *XdsServer) _connect(reConn bool) error {
365
366         xdsCfg := xdsServerConfig{}
367         if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
368                 xs.Connected = false
369                 if !reConn {
370                         xs._NotifyState()
371                 }
372                 return err
373         }
374
375         if reConn && xs.ID != xdsCfg.ID {
376                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
377         }
378
379         // Update local XDS config
380         xs.ID = xdsCfg.ID
381         xs.ServerConfig = &xdsCfg
382
383         // Establish WS connection and register listen
384         if err := xs._SocketConnect(); err != nil {
385                 xs.Connected = false
386                 xs._NotifyState()
387                 return err
388         }
389
390         xs.Connected = true
391         xs._NotifyState()
392         return nil
393 }
394
395 // Create WebSocket (io.socket) connection
396 func (xs *XdsServer) _SocketConnect() error {
397
398         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
399
400         opts := &sio_client.Options{
401                 Transport: "websocket",
402                 Header:    make(map[string][]string),
403         }
404         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
405
406         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
407         if err != nil {
408                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
409         }
410         xs.ioSock = iosk
411
412         // Register some listeners
413
414         iosk.On("error", func(err error) {
415                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
416                 if xs.CBOnError != nil {
417                         xs.CBOnError(err)
418                 }
419         })
420
421         iosk.On("disconnection", func(err error) {
422                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
423                 if xs.CBOnDisconnect != nil {
424                         xs.CBOnDisconnect(err)
425                 }
426                 xs.Connected = false
427                 xs._NotifyState()
428
429                 // Try to reconnect during 15min (or at least while not disabled)
430                 go func() {
431                         count := 0
432                         waitTime := 1
433                         for !xs.Disabled && !xs.Connected {
434                                 count++
435                                 if count%60 == 0 {
436                                         waitTime *= 5
437                                 }
438                                 if waitTime > 15*60 {
439                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
440                                         return
441                                 }
442                                 time.Sleep(time.Second * time.Duration(waitTime))
443                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
444
445                                 xs._reconnect()
446                         }
447                 }()
448         })
449
450         // XXX - There is no connection event generated so, just consider that
451         // we are connected when NewClient return successfully
452         /* iosk.On("connection", func() { ... }) */
453         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
454
455         return nil
456 }
457
458 // Send event to notify changes
459 func (xs *XdsServer) _NotifyState() {
460
461         evSts := ServerCfg{
462                 ID:         xs.ID,
463                 URL:        xs.BaseURL,
464                 APIURL:     xs.APIURL,
465                 PartialURL: xs.PartialURL,
466                 ConnRetry:  xs.ConnRetry,
467                 Connected:  xs.Connected,
468         }
469         if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
470                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
471         }
472 }