13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/xaapiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 "github.com/iotbzh/xds-server/lib/xsapiv1"
18 uuid "github.com/satori/go.uuid"
19 sio_client "github.com/sebd71/go-socket.io-client"
23 type XdsServer struct {
32 ServerConfig *xsapiv1.APIConfig
36 CBOnDisconnect func(error)
37 sockEvents map[string][]*caller
38 sockEventsLock *sync.Mutex
41 client *common.HTTPClient
42 ioSock *sio_client.Client
44 apiRouter *gin.RouterGroup
47 // EventCB Event emitter callback
48 type EventCB func(privData interface{}, evtData interface{}) error
50 // caller Used to chain event listeners
55 PrivateData interface{}
58 const _IDTempoPrefix = "tempo-"
60 // NewXdsServer creates an instance of XdsServer
61 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
64 ID: _IDTempoPrefix + uuid.NewV1().String(),
66 APIURL: conf.APIBaseURL + conf.APIPartialURL,
67 PartialURL: conf.APIPartialURL,
68 ConnRetry: conf.ConnRetry,
72 sockEvents: make(map[string][]*caller),
73 sockEventsLock: &sync.Mutex{},
78 // Close Free and close XDS Server connection
79 func (xs *XdsServer) Close() error {
87 // Connect Establish HTTP connection with XDS Server
88 func (xs *XdsServer) Connect() error {
96 for retry = xs.ConnRetry; retry > 0; retry-- {
97 if err = xs._CreateConnectHTTP(); err == nil {
100 if retry == xs.ConnRetry {
101 // Notify only on the first conn error
102 // doing that avoid 2 notifs (conn false; conn true) on startup
105 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
106 time.Sleep(time.Second)
109 // FIXME: re-use _reconnect to wait longer in background
110 return fmt.Errorf("Connection to XDS Server failure")
116 // Check HTTP connection and establish WS connection
117 err = xs._connect(false)
122 // IsTempoID returns true when server as a temporary id
123 func (xs *XdsServer) IsTempoID() bool {
124 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
127 // SetLoggerOutput Set logger ou
128 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
132 // SendCommand Send a command to XDS Server
133 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
135 if !strings.HasPrefix("/", cmd) {
138 return xs.client.Post(url, string(body), res)
141 // GetVersion Send Get request to retrieve XDS Server version
142 func (xs *XdsServer) GetVersion(res interface{}) error {
143 return xs.client.Get("/version", &res)
146 // GetFolders Send GET request to get current folder configuration
147 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
148 return xs.client.Get("/folders", folders)
151 // FolderAdd Send POST request to add a folder
152 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
153 err := xs.client.Post("/folders", fld, res)
155 return fmt.Errorf("FolderAdd error: %s", err.Error())
160 // FolderDelete Send DELETE request to delete a folder
161 func (xs *XdsServer) FolderDelete(id string) error {
162 return xs.client.HTTPDelete("/folders/" + id)
165 // FolderSync Send POST request to force synchronization of a folder
166 func (xs *XdsServer) FolderSync(id string) error {
167 return xs.client.HTTPPost("/folders/sync/"+id, "")
170 // FolderUpdate Send PUT request to update a folder
171 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
172 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
175 // SetAPIRouterGroup .
176 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
180 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
181 func (xs *XdsServer) PassthroughGet(url string) {
182 if xs.apiRouter == nil {
183 xs.Log.Errorf("apiRouter not set !")
187 xs.apiRouter.GET(url, func(c *gin.Context) {
189 // Take care of param (eg. id in /projects/:id)
191 if strings.Contains(url, ":") {
192 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
195 if err := xs.client.Get(nURL, &data); err != nil {
196 if strings.Contains(err.Error(), "connection refused") {
200 common.APIError(c, err.Error())
204 c.JSON(http.StatusOK, data)
208 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
209 func (xs *XdsServer) PassthroughPost(url string) {
210 if xs.apiRouter == nil {
211 xs.Log.Errorf("apiRouter not set !")
215 xs.apiRouter.POST(url, func(c *gin.Context) {
217 n, err := c.Request.Body.Read(bodyReq)
219 common.APIError(c, err.Error())
223 // Take care of param (eg. id in /projects/:id)
225 if strings.Contains(url, ":") {
226 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
230 body, err := json.Marshal(bodyReq[:n])
232 common.APIError(c, err.Error())
236 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
238 common.APIError(c, err.Error())
242 bodyRes, err := ioutil.ReadAll(response.Body)
244 common.APIError(c, "Cannot read response body")
247 c.JSON(http.StatusOK, string(bodyRes))
251 // EventRegister Post a request to register to an XdsServer event
252 func (xs *XdsServer) EventRegister(evName string, id string) error {
253 return xs.client.Post(
255 xsapiv1.EventRegisterArgs{
262 // EventOn Register a callback on events reception
263 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
264 if xs.ioSock == nil {
265 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
268 xs.sockEventsLock.Lock()
269 defer xs.sockEventsLock.Unlock()
271 if _, exist := xs.sockEvents[evName]; !exist {
272 // Register listener only the first time
275 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
277 if evName == "event:folder-state-change" {
278 err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
279 xs.sockEventsLock.Lock()
280 sEvts := make([]*caller, len(xs.sockEvents[evn]))
281 copy(sEvts, xs.sockEvents[evn])
282 xs.sockEventsLock.Unlock()
283 for _, c := range sEvts {
284 c.Func(c.PrivateData, data)
289 err = xs.ioSock.On(evn, func(data interface{}) error {
290 xs.sockEventsLock.Lock()
291 sEvts := make([]*caller, len(xs.sockEvents[evn]))
292 copy(sEvts, xs.sockEvents[evn])
293 xs.sockEventsLock.Unlock()
294 for _, c := range sEvts {
295 c.Func(c.PrivateData, data)
309 PrivateData: privData,
312 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
316 // EventOff Un-register a (or all) callbacks associated to an event
317 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
318 xs.sockEventsLock.Lock()
319 defer xs.sockEventsLock.Unlock()
320 if _, exist := xs.sockEvents[evName]; exist {
323 xs.sockEvents[evName] = []*caller{}
325 // Un-register only the specified callback
326 for i, ff := range xs.sockEvents[evName] {
327 if uuid.Equal(ff.id, id) {
328 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
337 // ProjectToFolder Convert Project structure to Folder structure
338 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
340 if pPrj.Type == xsapiv1.TypeCloudSync {
341 stID, _ = xs.SThg.IDGet()
343 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
344 fPrj := xsapiv1.FolderConfig{
347 ClientPath: pPrj.ClientPath,
348 Type: xsapiv1.FolderType(pPrj.Type),
350 IsInSync: pPrj.IsInSync,
351 DefaultSdk: pPrj.DefaultSdk,
352 ClientData: pPrj.ClientData,
353 DataPathMap: xsapiv1.PathMapConfig{
354 ServerPath: pPrj.ServerPath,
356 DataCloudSync: xsapiv1.CloudSyncConfig{
358 STLocIsInSync: pPrj.IsInSync,
359 STLocStatus: pPrj.Status,
360 STSvrIsInSync: pPrj.IsInSync,
361 STSvrStatus: pPrj.Status,
368 // FolderToProject Convert Folder structure to Project structure
369 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
370 inSync := fPrj.IsInSync
373 if fPrj.Type == xsapiv1.TypeCloudSync {
374 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
376 sts = fPrj.DataCloudSync.STSvrStatus
377 switch fPrj.DataCloudSync.STLocStatus {
378 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
379 sts = fPrj.DataCloudSync.STLocStatus
381 case xaapiv1.StatusSyncing:
382 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
383 sts = xaapiv1.StatusSyncing
386 case xaapiv1.StatusEnable:
392 pPrj := xaapiv1.ProjectConfig{
396 ClientPath: fPrj.ClientPath,
397 ServerPath: fPrj.DataPathMap.ServerPath,
398 Type: xaapiv1.ProjectType(fPrj.Type),
401 DefaultSdk: fPrj.DefaultSdk,
402 ClientData: fPrj.ClientData,
411 // Create HTTP client
412 func (xs *XdsServer) _CreateConnectHTTP() error {
414 xs.client, err = common.HTTPNewClient(xs.BaseURL,
415 common.HTTPClientConfig{
416 URLPrefix: "/api/v1",
417 HeaderClientKeyName: "Xds-Sid",
420 LogPrefix: "XDSSERVER: ",
421 LogLevel: common.HTTPLogLevelWarning,
424 xs.client.SetLogLevel(xs.Log.Level.String())
427 msg := ": " + err.Error()
428 if strings.Contains(err.Error(), "connection refused") {
429 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
431 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
433 if xs.client == nil {
434 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
440 // Re-established connection
441 func (xs *XdsServer) _reconnect() error {
442 err := xs._connect(true)
444 // Reload projects list for this server
445 err = xs.projects.Init(xs)
450 // Established HTTP and WS connection and retrieve XDSServer config
451 func (xs *XdsServer) _connect(reConn bool) error {
453 xdsCfg := xsapiv1.APIConfig{}
454 if err := xs.client.Get("/config", &xdsCfg); err != nil {
462 if reConn && xs.ID != xdsCfg.ServerUID {
463 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
466 // Update local XDS config
467 xs.ID = xdsCfg.ServerUID
468 xs.ServerConfig = &xdsCfg
470 // Establish WS connection and register listen
471 if err := xs._SocketConnect(); err != nil {
482 // Create WebSocket (io.socket) connection
483 func (xs *XdsServer) _SocketConnect() error {
485 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
487 opts := &sio_client.Options{
488 Transport: "websocket",
489 Header: make(map[string][]string),
491 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
493 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
495 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
499 // Register some listeners
501 iosk.On("error", func(err error) {
502 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
503 if xs.CBOnError != nil {
508 iosk.On("disconnection", func(err error) {
509 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
510 if xs.CBOnDisconnect != nil {
511 xs.CBOnDisconnect(err)
516 // Try to reconnect during 15min (or at least while not disabled)
520 for !xs.Disabled && !xs.Connected {
525 if waitTime > 15*60 {
526 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
529 time.Sleep(time.Second * time.Duration(waitTime))
530 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
537 // XXX - There is no connection event generated so, just consider that
538 // we are connected when NewClient return successfully
539 /* iosk.On("connection", func() { ... }) */
540 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
545 // Send event to notify changes
546 func (xs *XdsServer) _NotifyState() {
548 evSts := xaapiv1.ServerCfg{
552 PartialURL: xs.PartialURL,
553 ConnRetry: xs.ConnRetry,
554 Connected: xs.Connected,
556 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
557 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)