X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fagent%2Fxdsserver.go;h=40ee57b4b5ceef0d4a130d2c417a9c7e18cc12e2;hb=247bb7c2db5f0d48178398599348249bf886ebbc;hp=014415fe349470af224523d29b3261f6519fbac9;hpb=97ca1f277dc8b6973d6fa67add5593a9c395ce60;p=src%2Fxds%2Fxds-agent.git diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 014415f..40ee57b 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -1,92 +1,80 @@ +/* + * Copyright (C) 2017-2018 "IoT.bzh" + * Author Sebastien Douheret + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package agent import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "strings" + "sync" "time" + "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent.git/lib/xaapiv1" + "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent.git/lib/xdsconfig" + common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git" + "gerrit.automotivelinux.org/gerrit/src/xds/xds-server.git/lib/xsapiv1" "github.com/gin-gonic/gin" - "github.com/iotbzh/xds-agent/lib/xdsconfig" - common "github.com/iotbzh/xds-common/golib" uuid "github.com/satori/go.uuid" - sio_client "github.com/zhouhui8915/go-socket.io-client" + sio_client "github.com/sebd71/go-socket.io-client" ) -// Server . +// XdsServer . type XdsServer struct { *Context ID string + URLIndex string BaseURL string APIURL string PartialURL string ConnRetry int Connected bool Disabled bool - ServerConfig *xdsServerConfig + ServerConfig *xsapiv1.APIConfig - // callbacks + // Events management CBOnError func(error) CBOnDisconnect func(error) + sockEvents map[string][]*caller + sockEventsLock *sync.Mutex // Private fields - client *common.HTTPClient - ioSock *sio_client.Client - logOut io.Writer - apiRouter *gin.RouterGroup -} - -// xdsServerConfig Data return by GET /config -type xdsServerConfig struct { - ID string `json:"id"` - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` - SupportedSharing map[string]bool `json:"supportedSharing"` - Builder xdsBuilderConfig `json:"builder"` + client *common.HTTPClient + ioSock *sio_client.Client + logOut io.Writer + apiRouter *gin.RouterGroup + cmdList map[string]interface{} + cbOnConnect OnConnectedCB } -// xdsBuilderConfig represents the builder container configuration -type xdsBuilderConfig struct { - IP string `json:"ip"` - Port string `json:"port"` - SyncThingID string `json:"syncThingID"` -} - -// FolderType XdsServer folder type -type FolderType string - -const ( - XdsTypePathMap = "PathMap" - XdsTypeCloudSync = "CloudSync" - XdsTypeCifsSmb = "CIFS" -) - -// FolderConfig XdsServer folder config -type FolderConfig struct { - ID string `json:"id"` - Label string `json:"label"` - ClientPath string `json:"path"` - Type FolderType `json:"type"` - Status string `json:"status"` - IsInSync bool `json:"isInSync"` - DefaultSdk string `json:"defaultSdk"` - // Specific data depending on which Type is used - DataPathMap PathMapConfig `json:"dataPathMap,omitempty"` - DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"` -} +// EventCB Event emitter callback +type EventCB func(privData interface{}, evtData interface{}) error -// PathMapConfig Path mapping specific data -type PathMapConfig struct { - ServerPath string `json:"serverPath"` -} +// OnConnectedCB connect callback +type OnConnectedCB func(svr *XdsServer) error -// CloudSyncConfig CloudSync (AKA Syncthing) specific data -type CloudSyncConfig struct { - SyncThingID string `json:"syncThingID"` +// caller Used to chain event listeners +type caller struct { + id uuid.UUID + EventName string + Func EventCB + PrivateData interface{} } const _IDTempoPrefix = "tempo-" @@ -96,6 +84,7 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { return &XdsServer{ Context: ctx, ID: _IDTempoPrefix + uuid.NewV1().String(), + URLIndex: conf.URLIndex, BaseURL: conf.URL, APIURL: conf.APIBaseURL + conf.APIPartialURL, PartialURL: conf.APIPartialURL, @@ -103,17 +92,18 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { Connected: false, Disabled: false, - logOut: ctx.Log.Out, + sockEvents: make(map[string][]*caller), + sockEventsLock: &sync.Mutex{}, + logOut: ctx.Log.Out, + cmdList: make(map[string]interface{}), } } // Close Free and close XDS Server connection func (xs *XdsServer) Close() error { - xs.Connected = false + err := xs._Disconnected() xs.Disabled = true - xs.ioSock = nil - xs._NotifyState() - return nil + return err } // Connect Establish HTTP connection with XDS Server @@ -138,7 +128,7 @@ func (xs *XdsServer) Connect() error { time.Sleep(time.Second) } if retry == 0 { - // FIXME SEB: re-use _reconnect to wait longer in background + // FIXME: re-use _Reconnect to wait longer in background return fmt.Errorf("Connection to XDS Server failure") } if err != nil { @@ -146,11 +136,17 @@ func (xs *XdsServer) Connect() error { } // Check HTTP connection and establish WS connection - err = xs._connect(false) + err = xs._Connect(false) return err } +// ConnectOn Register a callback on events reception +func (xs *XdsServer) ConnectOn(f OnConnectedCB) error { + xs.cbOnConnect = f + return nil +} + // IsTempoID returns true when server as a temporary id func (xs *XdsServer) IsTempoID() bool { return strings.HasPrefix(xs.ID, _IDTempoPrefix) @@ -161,47 +157,86 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) { xs.logOut = out } -// FolderAdd Send POST request to add a folder -func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error { - response, err := xs.HTTPPost("/folder", prj) - if err != nil { - return err +// GetConfig return the current server config +func (xs *XdsServer) GetConfig() xaapiv1.ServerCfg { + return xaapiv1.ServerCfg{ + ID: xs.ID, + URL: xs.BaseURL, + APIURL: xs.APIURL, + PartialURL: xs.PartialURL, + ConnRetry: xs.ConnRetry, + Connected: xs.Connected, + Disabled: xs.Disabled, } - if response.StatusCode != 200 { - return fmt.Errorf("FolderAdd error status=%s", response.Status) +} + +// SendCommand Send a command to XDS Server +func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error { + url := cmd + if !strings.HasPrefix("/", cmd) { + url = "/" + cmd } - // Result is a FolderConfig that is equivalent to ProjectConfig - err = json.Unmarshal(xs.client.ResponseToBArray(response), res) + return xs.client.Post(url, string(body), res) +} + +// GetVersion Send Get request to retrieve XDS Server version +func (xs *XdsServer) GetVersion(res interface{}) error { + return xs.client.Get("/version", &res) +} +// GetFolders Send GET request to get current folder configuration +func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error { + return xs.client.Get("/folders", folders) +} + +// FolderAdd Send POST request to add a folder +func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error { + err := xs.client.Post("/folders", fld, res) + if err != nil { + return fmt.Errorf("FolderAdd error: %s", err.Error()) + } return err } // FolderDelete Send DELETE request to delete a folder func (xs *XdsServer) FolderDelete(id string) error { - return xs.client.HTTPDelete("/folder/" + id) + return xs.client.HTTPDelete("/folders/" + id) } -// HTTPGet . -func (xs *XdsServer) HTTPGet(url string, data interface{}) error { - var dd []byte - if err := xs.client.HTTPGet(url, &dd); err != nil { - return err - } - return json.Unmarshal(dd, &data) +// FolderSync Send POST request to force synchronization of a folder +func (xs *XdsServer) FolderSync(id string) error { + return xs.client.HTTPPost("/folders/sync/"+id, "") } -// HTTPPost . -func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) { - body, err := json.Marshal(data) - if err != nil { - return nil, err - } - return xs.HTTPPostBody(url, string(body)) +// FolderUpdate Send PUT request to update a folder +func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error { + return xs.client.Put("/folders/"+fld.ID, fld, resFld) +} + +// CommandExec Send POST request to execute a command +func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error { + return xs.client.Post("/exec", args, res) +} + +// CommandSignal Send POST request to send a signal to a command +func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error { + return xs.client.Post("/signal", args, res) +} + +// CommandTgtTerminalGet Send GET request to retrieve info of a target terminals +func (xs *XdsServer) CommandTgtTerminalGet(targetID, termID string, res *xsapiv1.TerminalConfig) error { + return xs.client.Get("/targets/"+targetID+"/terminals/"+termID, res) +} + +// CommandTgtTerminalOpen Send POST request to open a target terminal +func (xs *XdsServer) CommandTgtTerminalOpen(targetID string, termID string, res *xsapiv1.TerminalConfig) error { + var empty interface{} + return xs.client.Post("/targets/"+targetID+"/terminals/"+termID+"/open", &empty, res) } -// HTTPPostBody . -func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) { - return xs.client.HTTPPostWithRes(url, body) +// CommandTgtTerminalSignal Send POST request to send a signal to a target terminal +func (xs *XdsServer) CommandTgtTerminalSignal(args *xsapiv1.TerminalSignalArgs, res *xsapiv1.TerminalConfig) error { + return xs.client.Post("/signal", args, res) } // SetAPIRouterGroup . @@ -218,10 +253,15 @@ func (xs *XdsServer) PassthroughGet(url string) { xs.apiRouter.GET(url, func(c *gin.Context) { var data interface{} - if err := xs.HTTPGet(url, &data); err != nil { + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + // Send Get request + if err := xs.client.Get(nURL, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() } common.APIError(c, err.Error()) return @@ -239,84 +279,322 @@ func (xs *XdsServer) PassthroughPost(url string) { } xs.apiRouter.POST(url, func(c *gin.Context) { - bodyReq := []byte{} - n, err := c.Request.Body.Read(bodyReq) + var err error + var data interface{} + + // Get raw body + body, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) return } - response, err := xs.HTTPPostBody(url, string(bodyReq[:n])) + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Post request + response, err := xs.client.HTTPPostWithRes(nURL, string(body)) + if err != nil { + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) + return + } + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + return + }) +} + +// PassthroughPut Used to declare a route that sends directly a PUT request to XDS Server +func (xs *XdsServer) PassthroughPut(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.PUT(url, func(c *gin.Context) { + var err error + var data interface{} + + // Get raw body + body, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) return } - bodyRes, err := ioutil.ReadAll(response.Body) + + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Put request + response, err := xs.client.HTTPPutWithRes(nURL, string(body)) if err != nil { - common.APIError(c, "Cannot read response body") + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - c.JSON(http.StatusOK, string(bodyRes)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + return }) } -// EventOn Register a callback on events reception -func (xs *XdsServer) EventOn(message string, f interface{}) (err error) { +// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server +func (xs *XdsServer) PassthroughDelete(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.DELETE(url, func(c *gin.Context) { + var err error + var data interface{} + + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Post request + response, err := xs.client.HTTPDeleteWithRes(nURL) + if err != nil { + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) + return + } + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + return + }) +} + +// EventRegister Post a request to register to an XdsServer event +func (xs *XdsServer) EventRegister(evName string, filter string) error { + return xs.client.Post("/events/register", + xsapiv1.EventRegisterArgs{ + Name: evName, + Filter: filter, + }, + nil) +} + +// EventEmit Emit a event to XDS Server through WS +func (xs *XdsServer) EventEmit(message string, args ...interface{}) error { if xs.ioSock == nil { return fmt.Errorf("Io.Socket not initialized") } - // FIXME SEB: support chain / multiple listeners - /* sockEvents map[string][]*caller + + return xs.ioSock.Emit(message, args...) +} + +// EventOn Register a callback on events reception +func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) { + if xs.ioSock == nil { + return uuid.Nil, fmt.Errorf("Io.Socket not initialized") + } + xs.sockEventsLock.Lock() - xs.sockEvents[message] = append(xs.sockEvents[message], f) - xs.sockEventsLock.Unlock() - xs.ioSock.On(message, func(ev) { + defer xs.sockEventsLock.Unlock() + + if _, exist := xs.sockEvents[evName]; !exist { + // Register listener only the first time + evn := evName + + err := xs.ioSock.On(evn, func(data interface{}) error { + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) + if err != nil { + return uuid.Nil, err + } + } - }) - */ - return xs.ioSock.On(message, f) + c := &caller{ + id: uuid.NewV1(), + EventName: evName, + Func: f, + PrivateData: privData, + } + + xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) + return c.id, nil } -// ProjectToFolder -func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig { +// EventOff Un-register a (or all) callbacks associated to an event +func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + if _, exist := xs.sockEvents[evName]; exist { + if id == uuid.Nil { + // Un-register all + xs.sockEvents[evName] = []*caller{} + } else { + // Un-register only the specified callback + for i, ff := range xs.sockEvents[evName] { + if uuid.Equal(ff.id, id) { + xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...) + break + } + } + } + } + xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) + return nil +} + +// ProjectToFolder Convert Project structure to Folder structure +func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig { stID := "" - if pPrj.Type == XdsTypeCloudSync { + if pPrj.Type == xsapiv1.TypeCloudSync { stID, _ = xs.SThg.IDGet() } - fPrj := FolderConfig{ + // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/) + fPrj := xsapiv1.FolderConfig{ ID: pPrj.ID, Label: pPrj.Label, ClientPath: pPrj.ClientPath, - Type: FolderType(pPrj.Type), + Type: xsapiv1.FolderType(pPrj.Type), Status: pPrj.Status, IsInSync: pPrj.IsInSync, DefaultSdk: pPrj.DefaultSdk, - DataPathMap: PathMapConfig{ + ClientData: pPrj.ClientData, + DataPathMap: xsapiv1.PathMapConfig{ ServerPath: pPrj.ServerPath, }, - DataCloudSync: CloudSyncConfig{ - SyncThingID: stID, + DataCloudSync: xsapiv1.CloudSyncConfig{ + SyncThingID: stID, + STLocIsInSync: pPrj.IsInSync, + STLocStatus: pPrj.Status, + STSvrIsInSync: pPrj.IsInSync, + STSvrStatus: pPrj.Status, }, } + return &fPrj } -// FolderToProject -func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { - pPrj := ProjectConfig{ +// FolderToProject Convert Folder structure to Project structure +func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig { + inSync := fPrj.IsInSync + sts := fPrj.Status + + if fPrj.Type == xsapiv1.TypeCloudSync { + inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync + + sts = fPrj.DataCloudSync.STSvrStatus + switch fPrj.DataCloudSync.STLocStatus { + case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause: + sts = fPrj.DataCloudSync.STLocStatus + break + case xaapiv1.StatusSyncing: + if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause { + sts = xaapiv1.StatusSyncing + } + break + case xaapiv1.StatusEnable: + // keep STSvrStatus + break + } + } + + pPrj := xaapiv1.ProjectConfig{ ID: fPrj.ID, ServerID: xs.ID, Label: fPrj.Label, ClientPath: fPrj.ClientPath, ServerPath: fPrj.DataPathMap.ServerPath, - Type: ProjectType(fPrj.Type), - Status: fPrj.Status, - IsInSync: fPrj.IsInSync, + Type: xaapiv1.ProjectType(fPrj.Type), + Status: sts, + IsInSync: inSync, DefaultSdk: fPrj.DefaultSdk, + ClientData: fPrj.ClientData, } return pPrj } +// CommandAdd Add a new command to the list of running commands +func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error { + if xs.CommandGet(cmdID) != nil { + return fmt.Errorf("command id already exist") + } + xs.cmdList[cmdID] = data + return nil +} + +// CommandDelete Delete a command from the command list +func (xs *XdsServer) CommandDelete(cmdID string) error { + if xs.CommandGet(cmdID) == nil { + return fmt.Errorf("unknown command id") + } + delete(xs.cmdList, cmdID) + return nil +} + +// CommandGet Retrieve a command data +func (xs *XdsServer) CommandGet(cmdID string) interface{} { + d, exist := xs.cmdList[cmdID] + if exist { + return d + } + return nil +} + /*** ** Private functions ***/ @@ -350,21 +628,20 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } -// Re-established connection -func (xs *XdsServer) _reconnect() error { - err := xs._connect(true) - if err == nil { - // Reload projects list for this server - err = xs.projects.Init(xs) - } +// _Reconnect Re-established connection +func (xs *XdsServer) _Reconnect() error { + + // Note that ConnectOn callback will be called (see apiv1.go file) + err := xs._Connect(true) + return err } -// Established HTTP and WS connection and retrieve XDSServer config -func (xs *XdsServer) _connect(reConn bool) error { +// _Connect Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _Connect(reConn bool) error { - xdsCfg := xdsServerConfig{} - if err := xs.HTTPGet("/config", &xdsCfg); err != nil { + xdsCfg := xsapiv1.APIConfig{} + if err := xs.client.Get("/config", &xdsCfg); err != nil { xs.Connected = false if !reConn { xs._NotifyState() @@ -372,27 +649,32 @@ func (xs *XdsServer) _connect(reConn bool) error { return err } - if reConn && xs.ID != xdsCfg.ID { - xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID) + if reConn && xs.ID != xdsCfg.ServerUID { + xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID) } // Update local XDS config - xs.ID = xdsCfg.ID + xs.ID = xdsCfg.ServerUID xs.ServerConfig = &xdsCfg // Establish WS connection and register listen if err := xs._SocketConnect(); err != nil { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() return err } xs.Connected = true + + // Call OnConnect callback + if xs.cbOnConnect != nil { + xs.cbOnConnect(xs) + } + xs._NotifyState() return nil } -// Create WebSocket (io.socket) connection +// _SocketConnect Create WebSocket (io.socket) connection func (xs *XdsServer) _SocketConnect() error { xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) @@ -419,12 +701,11 @@ func (xs *XdsServer) _SocketConnect() error { }) iosk.On("disconnection", func(err error) { - xs.Log.Infof("IO.socket disconnection server %s", xs.ID) + xs.Log.Infof("IO.socket disconnection server %s (APIURL %s)", xs.ID, xs.APIURL) if xs.CBOnDisconnect != nil { xs.CBOnDisconnect(err) } - xs.Connected = false - xs._NotifyState() + xs._Disconnected() // Try to reconnect during 15min (or at least while not disabled) go func() { @@ -442,7 +723,12 @@ func (xs *XdsServer) _SocketConnect() error { time.Sleep(time.Second * time.Duration(waitTime)) xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) - xs._reconnect() + err := xs._Reconnect() + if err != nil && + !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) { + xs.Log.Errorf("ERROR while reconnecting: %v", err.Error()) + } + } }() }) @@ -455,10 +741,25 @@ func (xs *XdsServer) _SocketConnect() error { return nil } -// Send event to notify changes +// _Disconnected Set XDS Server as disconnected +func (xs *XdsServer) _Disconnected() error { + // Clear all register events as socket is closed + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + + for k := range xs.sockEvents { + delete(xs.sockEvents, k) + } + xs.Connected = false + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// _NotifyState Send event to notify changes func (xs *XdsServer) _NotifyState() { - evSts := ServerCfg{ + evSts := xaapiv1.ServerCfg{ ID: xs.ID, URL: xs.BaseURL, APIURL: xs.APIURL, @@ -466,7 +767,7 @@ func (xs *XdsServer) _NotifyState() { ConnRetry: xs.ConnRetry, Connected: xs.Connected, } - if err := xs.events.Emit(EVTServerConfig, evSts); err != nil { + if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil { xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) } }