+/*
+ * Copyright (C) 2017-2018 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package agent
import (
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"net/http"
"strings"
+ "sync"
"time"
+ "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xaapiv1"
+ "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xdsconfig"
+ common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git/golib"
+ "gerrit.automotivelinux.org/gerrit/src/xds/xds-server.git/lib/xsapiv1"
"github.com/gin-gonic/gin"
- "github.com/iotbzh/xds-agent/lib/xdsconfig"
- common "github.com/iotbzh/xds-common/golib"
uuid "github.com/satori/go.uuid"
- sio_client "github.com/zhouhui8915/go-socket.io-client"
+ sio_client "github.com/sebd71/go-socket.io-client"
)
-// Server .
+// XdsServer .
type XdsServer struct {
*Context
ID string
+ URLIndex string
BaseURL string
APIURL string
PartialURL string
ConnRetry int
Connected bool
Disabled bool
- ServerConfig *xdsServerConfig
+ ServerConfig *xsapiv1.APIConfig
- // callbacks
+ // Events management
CBOnError func(error)
CBOnDisconnect func(error)
+ sockEvents map[string][]*caller
+ sockEventsLock *sync.Mutex
// Private fields
- client *common.HTTPClient
- ioSock *sio_client.Client
- logOut io.Writer
- apiRouter *gin.RouterGroup
-}
-
-// xdsServerConfig Data return by GET /config
-type xdsServerConfig struct {
- ID string `json:"id"`
- Version string `json:"version"`
- APIVersion string `json:"apiVersion"`
- VersionGitTag string `json:"gitTag"`
- SupportedSharing map[string]bool `json:"supportedSharing"`
- Builder xdsBuilderConfig `json:"builder"`
+ client *common.HTTPClient
+ ioSock *sio_client.Client
+ logOut io.Writer
+ apiRouter *gin.RouterGroup
+ cmdList map[string]interface{}
+ cbOnConnect OnConnectedCB
}
-// xdsBuilderConfig represents the builder container configuration
-type xdsBuilderConfig struct {
- IP string `json:"ip"`
- Port string `json:"port"`
- SyncThingID string `json:"syncThingID"`
-}
-
-// FolderType XdsServer folder type
-type FolderType string
-
-const (
- XdsTypePathMap = "PathMap"
- XdsTypeCloudSync = "CloudSync"
- XdsTypeCifsSmb = "CIFS"
-)
-
-// FolderConfig XdsServer folder config
-type FolderConfig struct {
- ID string `json:"id"`
- Label string `json:"label"`
- ClientPath string `json:"path"`
- Type FolderType `json:"type"`
- Status string `json:"status"`
- IsInSync bool `json:"isInSync"`
- DefaultSdk string `json:"defaultSdk"`
- // Specific data depending on which Type is used
- DataPathMap PathMapConfig `json:"dataPathMap,omitempty"`
- DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
-}
+// EventCB Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
-// PathMapConfig Path mapping specific data
-type PathMapConfig struct {
- ServerPath string `json:"serverPath"`
-}
+// OnConnectedCB connect callback
+type OnConnectedCB func(svr *XdsServer) error
-// CloudSyncConfig CloudSync (AKA Syncthing) specific data
-type CloudSyncConfig struct {
- SyncThingID string `json:"syncThingID"`
+// caller Used to chain event listeners
+type caller struct {
+ id uuid.UUID
+ EventName string
+ Func EventCB
+ PrivateData interface{}
}
const _IDTempoPrefix = "tempo-"
return &XdsServer{
Context: ctx,
ID: _IDTempoPrefix + uuid.NewV1().String(),
+ URLIndex: conf.URLIndex,
BaseURL: conf.URL,
APIURL: conf.APIBaseURL + conf.APIPartialURL,
PartialURL: conf.APIPartialURL,
Connected: false,
Disabled: false,
- logOut: ctx.Log.Out,
+ sockEvents: make(map[string][]*caller),
+ sockEventsLock: &sync.Mutex{},
+ logOut: ctx.Log.Out,
+ cmdList: make(map[string]interface{}),
}
}
// Close Free and close XDS Server connection
func (xs *XdsServer) Close() error {
- xs.Connected = false
+ err := xs._Disconnected()
xs.Disabled = true
- xs.ioSock = nil
- xs._NotifyState()
- return nil
+ return err
}
// Connect Establish HTTP connection with XDS Server
time.Sleep(time.Second)
}
if retry == 0 {
- // FIXME SEB: re-use _reconnect to wait longer in background
+ // FIXME: re-use _Reconnect to wait longer in background
return fmt.Errorf("Connection to XDS Server failure")
}
if err != nil {
}
// Check HTTP connection and establish WS connection
- err = xs._connect(false)
+ err = xs._Connect(false)
return err
}
+// ConnectOn Register a callback on events reception
+func (xs *XdsServer) ConnectOn(f OnConnectedCB) error {
+ xs.cbOnConnect = f
+ return nil
+}
+
// IsTempoID returns true when server as a temporary id
func (xs *XdsServer) IsTempoID() bool {
return strings.HasPrefix(xs.ID, _IDTempoPrefix)
xs.logOut = out
}
-// FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
- response, err := xs.HTTPPost("/folder", prj)
- if err != nil {
- return err
+// GetConfig
+func (xs *XdsServer) GetConfig() xaapiv1.ServerCfg {
+ return xaapiv1.ServerCfg{
+ ID: xs.ID,
+ URL: xs.BaseURL,
+ APIURL: xs.APIURL,
+ PartialURL: xs.PartialURL,
+ ConnRetry: xs.ConnRetry,
+ Connected: xs.Connected,
+ Disabled: xs.Disabled,
}
- if response.StatusCode != 200 {
- return fmt.Errorf("FolderAdd error status=%s", response.Status)
+}
+
+// SendCommand Send a command to XDS Server
+func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
+ url := cmd
+ if !strings.HasPrefix("/", cmd) {
+ url = "/" + cmd
}
- // Result is a FolderConfig that is equivalent to ProjectConfig
- err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
+ return xs.client.Post(url, string(body), res)
+}
+
+// GetVersion Send Get request to retrieve XDS Server version
+func (xs *XdsServer) GetVersion(res interface{}) error {
+ return xs.client.Get("/version", &res)
+}
+// GetFolders Send GET request to get current folder configuration
+func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
+ return xs.client.Get("/folders", folders)
+}
+
+// FolderAdd Send POST request to add a folder
+func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
+ err := xs.client.Post("/folders", fld, res)
+ if err != nil {
+ return fmt.Errorf("FolderAdd error: %s", err.Error())
+ }
return err
}
// FolderDelete Send DELETE request to delete a folder
func (xs *XdsServer) FolderDelete(id string) error {
- return xs.client.HTTPDelete("/folder/" + id)
+ return xs.client.HTTPDelete("/folders/" + id)
}
-// HTTPGet .
-func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
- var dd []byte
- if err := xs.client.HTTPGet(url, &dd); err != nil {
- return err
- }
- return json.Unmarshal(dd, &data)
+// FolderSync Send POST request to force synchronization of a folder
+func (xs *XdsServer) FolderSync(id string) error {
+ return xs.client.HTTPPost("/folders/sync/"+id, "")
}
-// HTTPPost .
-func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
- body, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- return xs.HTTPPostBody(url, string(body))
+// FolderUpdate Send PUT request to update a folder
+func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
+ return xs.client.Put("/folders/"+fld.ID, fld, resFld)
+}
+
+// CommandExec Send POST request to execute a command
+func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
+ return xs.client.Post("/exec", args, res)
+}
+
+// CommandSignal Send POST request to send a signal to a command
+func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
+ return xs.client.Post("/signal", args, res)
+}
+
+// CommandTgtTerminalGet Send GET request to retrieve info of a target terminals
+func (xs *XdsServer) CommandTgtTerminalGet(targetID, termID string, res *xsapiv1.TerminalConfig) error {
+ return xs.client.Get("/targets/"+targetID+"/terminals/"+termID, res)
+}
+
+// CommandTgtTerminalOpen Send POST request to open a target terminal
+func (xs *XdsServer) CommandTgtTerminalOpen(targetID string, termID string, res *xsapiv1.TerminalConfig) error {
+ var empty interface{}
+ return xs.client.Post("/targets/"+targetID+"/terminals/"+termID+"/open", &empty, res)
}
-// HTTPPostBody .
-func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
- return xs.client.HTTPPostWithRes(url, body)
+// CommandTgtTerminalSignal Send POST request to send a signal to a target terminal
+func (xs *XdsServer) CommandTgtTerminalSignal(args *xsapiv1.TerminalSignalArgs, res *xsapiv1.TerminalConfig) error {
+ return xs.client.Post("/signal", args, res)
}
// SetAPIRouterGroup .
xs.apiRouter.GET(url, func(c *gin.Context) {
var data interface{}
- if err := xs.HTTPGet(url, &data); err != nil {
+ // Take care of param (eg. id in /projects/:id)
+ nURL := url
+ if strings.Contains(url, ":") {
+ nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+ }
+ // Send Get request
+ if err := xs.client.Get(nURL, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
}
common.APIError(c, err.Error())
return
}
xs.apiRouter.POST(url, func(c *gin.Context) {
- bodyReq := []byte{}
- n, err := c.Request.Body.Read(bodyReq)
+ var err error
+ var data interface{}
+
+ // Get raw body
+ body, err := c.GetRawData()
if err != nil {
common.APIError(c, err.Error())
return
}
- response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+ // Take care of param (eg. id in /projects/:id)
+ nURL := url
+ if strings.Contains(url, ":") {
+ nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+ }
+
+ // Send Post request
+ response, err := xs.client.HTTPPostWithRes(nURL, string(body))
+ if err != nil {
+ goto httpError
+ }
+ if response.StatusCode != 200 {
+ err = fmt.Errorf(response.Status)
+ return
+ }
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
+ if err != nil {
+ goto httpError
+ }
+
+ c.JSON(http.StatusOK, data)
+ return
+
+ /* Handle error case */
+ httpError:
+ if strings.Contains(err.Error(), "connection refused") {
+ xs._Disconnected()
+ }
+ common.APIError(c, err.Error())
+ return
+ })
+}
+
+// PassthroughPut Used to declare a route that sends directly a PUT request to XDS Server
+func (xs *XdsServer) PassthroughPut(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.PUT(url, func(c *gin.Context) {
+ var err error
+ var data interface{}
+
+ // Get raw body
+ body, err := c.GetRawData()
if err != nil {
common.APIError(c, err.Error())
return
}
- bodyRes, err := ioutil.ReadAll(response.Body)
+
+ // Take care of param (eg. id in /projects/:id)
+ nURL := url
+ if strings.Contains(url, ":") {
+ nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+ }
+
+ // Send Put request
+ response, err := xs.client.HTTPPutWithRes(nURL, string(body))
if err != nil {
- common.APIError(c, "Cannot read response body")
+ goto httpError
+ }
+ if response.StatusCode != 200 {
+ err = fmt.Errorf(response.Status)
return
}
- c.JSON(http.StatusOK, string(bodyRes))
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
+ if err != nil {
+ goto httpError
+ }
+
+ c.JSON(http.StatusOK, data)
+ return
+
+ /* Handle error case */
+ httpError:
+ if strings.Contains(err.Error(), "connection refused") {
+ xs._Disconnected()
+ }
+ common.APIError(c, err.Error())
+ return
})
}
-// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
+func (xs *XdsServer) PassthroughDelete(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.DELETE(url, func(c *gin.Context) {
+ var err error
+ var data interface{}
+
+ // Take care of param (eg. id in /projects/:id)
+ nURL := url
+ if strings.Contains(url, ":") {
+ nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+ }
+
+ // Send Post request
+ response, err := xs.client.HTTPDeleteWithRes(nURL)
+ if err != nil {
+ goto httpError
+ }
+ if response.StatusCode != 200 {
+ err = fmt.Errorf(response.Status)
+ return
+ }
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
+ if err != nil {
+ goto httpError
+ }
+
+ c.JSON(http.StatusOK, data)
+ return
+
+ /* Handle error case */
+ httpError:
+ if strings.Contains(err.Error(), "connection refused") {
+ xs._Disconnected()
+ }
+ common.APIError(c, err.Error())
+ return
+ })
+}
+
+// EventRegister Post a request to register to an XdsServer event
+func (xs *XdsServer) EventRegister(evName string, filter string) error {
+ return xs.client.Post("/events/register",
+ xsapiv1.EventRegisterArgs{
+ Name: evName,
+ Filter: filter,
+ },
+ nil)
+}
+
+// EventEmit Emit a event to XDS Server through WS
+func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
if xs.ioSock == nil {
return fmt.Errorf("Io.Socket not initialized")
}
- // FIXME SEB: support chain / multiple listeners
- /* sockEvents map[string][]*caller
+
+ return xs.ioSock.Emit(message, args...)
+}
+
+// EventOn Register a callback on events reception
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
+ if xs.ioSock == nil {
+ return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
+ }
+
xs.sockEventsLock.Lock()
- xs.sockEvents[message] = append(xs.sockEvents[message], f)
- xs.sockEventsLock.Unlock()
- xs.ioSock.On(message, func(ev) {
+ defer xs.sockEventsLock.Unlock()
+
+ if _, exist := xs.sockEvents[evName]; !exist {
+ // Register listener only the first time
+ evn := evName
+
+ err := xs.ioSock.On(evn, func(data interface{}) error {
+ xs.sockEventsLock.Lock()
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
+ }
+ return nil
+ })
+ if err != nil {
+ return uuid.Nil, err
+ }
+ }
- })
- */
- return xs.ioSock.On(message, f)
+ c := &caller{
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ PrivateData: privData,
+ }
+
+ xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+ xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
+ return c.id, nil
}
-// ProjectToFolder
-func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+// EventOff Un-register a (or all) callbacks associated to an event
+func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+ if _, exist := xs.sockEvents[evName]; exist {
+ if id == uuid.Nil {
+ // Un-register all
+ xs.sockEvents[evName] = []*caller{}
+ } else {
+ // Un-register only the specified callback
+ for i, ff := range xs.sockEvents[evName] {
+ if uuid.Equal(ff.id, id) {
+ xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
+ break
+ }
+ }
+ }
+ }
+ xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
+ return nil
+}
+
+// ProjectToFolder Convert Project structure to Folder structure
+func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
stID := ""
- if pPrj.Type == XdsTypeCloudSync {
+ if pPrj.Type == xsapiv1.TypeCloudSync {
stID, _ = xs.SThg.IDGet()
}
- fPrj := FolderConfig{
+ // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
+ fPrj := xsapiv1.FolderConfig{
ID: pPrj.ID,
Label: pPrj.Label,
ClientPath: pPrj.ClientPath,
- Type: FolderType(pPrj.Type),
+ Type: xsapiv1.FolderType(pPrj.Type),
Status: pPrj.Status,
IsInSync: pPrj.IsInSync,
DefaultSdk: pPrj.DefaultSdk,
- DataPathMap: PathMapConfig{
+ ClientData: pPrj.ClientData,
+ DataPathMap: xsapiv1.PathMapConfig{
ServerPath: pPrj.ServerPath,
},
- DataCloudSync: CloudSyncConfig{
- SyncThingID: stID,
+ DataCloudSync: xsapiv1.CloudSyncConfig{
+ SyncThingID: stID,
+ STLocIsInSync: pPrj.IsInSync,
+ STLocStatus: pPrj.Status,
+ STSvrIsInSync: pPrj.IsInSync,
+ STSvrStatus: pPrj.Status,
},
}
+
return &fPrj
}
-// FolderToProject
-func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
- pPrj := ProjectConfig{
+// FolderToProject Convert Folder structure to Project structure
+func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
+ inSync := fPrj.IsInSync
+ sts := fPrj.Status
+
+ if fPrj.Type == xsapiv1.TypeCloudSync {
+ inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
+
+ sts = fPrj.DataCloudSync.STSvrStatus
+ switch fPrj.DataCloudSync.STLocStatus {
+ case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
+ sts = fPrj.DataCloudSync.STLocStatus
+ break
+ case xaapiv1.StatusSyncing:
+ if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
+ sts = xaapiv1.StatusSyncing
+ }
+ break
+ case xaapiv1.StatusEnable:
+ // keep STSvrStatus
+ break
+ }
+ }
+
+ pPrj := xaapiv1.ProjectConfig{
ID: fPrj.ID,
ServerID: xs.ID,
Label: fPrj.Label,
ClientPath: fPrj.ClientPath,
ServerPath: fPrj.DataPathMap.ServerPath,
- Type: ProjectType(fPrj.Type),
- Status: fPrj.Status,
- IsInSync: fPrj.IsInSync,
+ Type: xaapiv1.ProjectType(fPrj.Type),
+ Status: sts,
+ IsInSync: inSync,
DefaultSdk: fPrj.DefaultSdk,
+ ClientData: fPrj.ClientData,
}
return pPrj
}
+// CommandAdd Add a new command to the list of running commands
+func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
+ if xs.CommandGet(cmdID) != nil {
+ return fmt.Errorf("command id already exist")
+ }
+ xs.cmdList[cmdID] = data
+ return nil
+}
+
+// CommandDelete Delete a command from the command list
+func (xs *XdsServer) CommandDelete(cmdID string) error {
+ if xs.CommandGet(cmdID) == nil {
+ return fmt.Errorf("unknown command id")
+ }
+ delete(xs.cmdList, cmdID)
+ return nil
+}
+
+// CommandGet Retrieve a command data
+func (xs *XdsServer) CommandGet(cmdID string) interface{} {
+ d, exist := xs.cmdList[cmdID]
+ if exist {
+ return d
+ }
+ return nil
+}
+
/***
** Private functions
***/
return nil
}
-// Re-established connection
-func (xs *XdsServer) _reconnect() error {
- err := xs._connect(true)
- if err == nil {
- // Reload projects list for this server
- err = xs.projects.Init(xs)
- }
+// _Reconnect Re-established connection
+func (xs *XdsServer) _Reconnect() error {
+
+ // Note that ConnectOn callback will be called (see apiv1.go file)
+ err := xs._Connect(true)
+
return err
}
-// Established HTTP and WS connection and retrieve XDSServer config
-func (xs *XdsServer) _connect(reConn bool) error {
+// _Connect Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _Connect(reConn bool) error {
- xdsCfg := xdsServerConfig{}
- if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+ xdsCfg := xsapiv1.APIConfig{}
+ if err := xs.client.Get("/config", &xdsCfg); err != nil {
xs.Connected = false
if !reConn {
xs._NotifyState()
return err
}
- if reConn && xs.ID != xdsCfg.ID {
- xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+ if reConn && xs.ID != xdsCfg.ServerUID {
+ xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
}
// Update local XDS config
- xs.ID = xdsCfg.ID
+ xs.ID = xdsCfg.ServerUID
xs.ServerConfig = &xdsCfg
// Establish WS connection and register listen
if err := xs._SocketConnect(); err != nil {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
return err
}
xs.Connected = true
+
+ // Call OnConnect callback
+ if xs.cbOnConnect != nil {
+ xs.cbOnConnect(xs)
+ }
+
xs._NotifyState()
return nil
}
-// Create WebSocket (io.socket) connection
+// _SocketConnect Create WebSocket (io.socket) connection
func (xs *XdsServer) _SocketConnect() error {
xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
})
iosk.On("disconnection", func(err error) {
- xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
+ xs.Log.Infof("IO.socket disconnection server %s (APIURL %s)", xs.ID, xs.APIURL)
if xs.CBOnDisconnect != nil {
xs.CBOnDisconnect(err)
}
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
// Try to reconnect during 15min (or at least while not disabled)
go func() {
time.Sleep(time.Second * time.Duration(waitTime))
xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
- xs._reconnect()
+ err := xs._Reconnect()
+ if err != nil &&
+ !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
+ xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
+ }
+
}
}()
})
return nil
}
-// Send event to notify changes
+// _Disconnected Set XDS Server as disconnected
+func (xs *XdsServer) _Disconnected() error {
+ // Clear all register events as socket is closed
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+
+ for k := range xs.sockEvents {
+ delete(xs.sockEvents, k)
+ }
+ xs.Connected = false
+ xs.ioSock = nil
+ xs._NotifyState()
+ return nil
+}
+
+// _NotifyState Send event to notify changes
func (xs *XdsServer) _NotifyState() {
- evSts := ServerCfg{
+ evSts := xaapiv1.ServerCfg{
ID: xs.ID,
URL: xs.BaseURL,
APIURL: xs.APIURL,
ConnRetry: xs.ConnRetry,
Connected: xs.Connected,
}
- if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
+ if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
}
}