X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fagent%2Fxdsserver.go;h=bca4b66fc6b6c88bdffb1673d349e4ff29d3c857;hb=32791ffed5bdfaa698e90f9c067dc6e8ababbfc3;hp=37604b51ba7ac671667c454984e24f66ef8ff4b6;hpb=f1886fb200406a6433669649c4f2448904876ec2;p=src%2Fxds%2Fxds-agent.git diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 37604b5..bca4b66 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -11,9 +11,10 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/iotbzh/xds-agent/lib/apiv1" + "github.com/iotbzh/xds-agent/lib/xaapiv1" "github.com/iotbzh/xds-agent/lib/xdsconfig" common "github.com/iotbzh/xds-common/golib" + "github.com/iotbzh/xds-server/lib/xsapiv1" uuid "github.com/satori/go.uuid" sio_client "github.com/sebd71/go-socket.io-client" ) @@ -28,7 +29,7 @@ type XdsServer struct { ConnRetry int Connected bool Disabled bool - ServerConfig *XdsServerConfig + ServerConfig *xsapiv1.APIConfig // Events management CBOnError func(error) @@ -41,82 +42,18 @@ type XdsServer struct { ioSock *sio_client.Client logOut io.Writer apiRouter *gin.RouterGroup + cmdList map[string]interface{} } -// XdsServerConfig Data return by GET /config -type XdsServerConfig struct { - ID string `json:"id"` - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` - SupportedSharing map[string]bool `json:"supportedSharing"` - Builder XdsBuilderConfig `json:"builder"` -} - -// XdsBuilderConfig represents the builder container configuration -type XdsBuilderConfig struct { - IP string `json:"ip"` - Port string `json:"port"` - SyncThingID string `json:"syncThingID"` -} - -// XdsFolderType XdsServer folder type -type XdsFolderType string - -const ( - XdsTypePathMap = "PathMap" - XdsTypeCloudSync = "CloudSync" - XdsTypeCifsSmb = "CIFS" -) - -// XdsFolderConfig XdsServer folder config -type XdsFolderConfig struct { - ID string `json:"id"` - Label string `json:"label"` - ClientPath string `json:"path"` - Type XdsFolderType `json:"type"` - Status string `json:"status"` - IsInSync bool `json:"isInSync"` - DefaultSdk string `json:"defaultSdk"` - // Specific data depending on which Type is used - DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"` - DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"` -} - -// XdsPathMapConfig Path mapping specific data -type XdsPathMapConfig struct { - ServerPath string `json:"serverPath"` - CheckFile string `json:"checkFile"` - CheckContent string `json:"checkContent"` -} - -// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data -type XdsCloudSyncConfig struct { - SyncThingID string `json:"syncThingID"` - STSvrStatus string `json:"-"` - STSvrIsInSync bool `json:"-"` - STLocStatus string `json:"-"` - STLocIsInSync bool `json:"-"` -} - -// XdsEventRegisterArgs arguments used to register to XDS server events -type XdsEventRegisterArgs struct { - Name string `json:"name"` - ProjectID string `json:"filterProjectID"` -} - -// XdsEventFolderChange Folder change event structure -type XdsEventFolderChange struct { - Time string `json:"time"` - Type string `json:"type"` - Folder XdsFolderConfig `json:"folder"` -} +// EventCB Event emitter callback +type EventCB func(privData interface{}, evtData interface{}) error // caller Used to chain event listeners type caller struct { - id uuid.UUID - EventName string - Func func(interface{}) + id uuid.UUID + EventName string + Func EventCB + PrivateData interface{} } const _IDTempoPrefix = "tempo-" @@ -136,6 +73,7 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { sockEvents: make(map[string][]*caller), sockEventsLock: &sync.Mutex{}, logOut: ctx.Log.Out, + cmdList: make(map[string]interface{}), } } @@ -194,47 +132,56 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) { } // SendCommand Send a command to XDS Server -func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) { +func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error { url := cmd if !strings.HasPrefix("/", cmd) { url = "/" + cmd } - return xs.client.HTTPPostWithRes(url, string(body)) + return xs.client.Post(url, string(body), res) } // GetVersion Send Get request to retrieve XDS Server version func (xs *XdsServer) GetVersion(res interface{}) error { - return xs._HTTPGet("/version", &res) + return xs.client.Get("/version", &res) } // GetFolders Send GET request to get current folder configuration -func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error { - return xs._HTTPGet("/folders", folders) +func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error { + return xs.client.Get("/folders", folders) } // FolderAdd Send POST request to add a folder -func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error { - response, err := xs._HTTPPost("/folder", fld) +func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error { + err := xs.client.Post("/folders", fld, res) if err != nil { - return err + return fmt.Errorf("FolderAdd error: %s", err.Error()) } - if response.StatusCode != 200 { - return fmt.Errorf("FolderAdd error status=%s", response.Status) - } - // Result is a XdsFolderConfig that is equivalent to ProjectConfig - err = json.Unmarshal(xs.client.ResponseToBArray(response), res) - return err } // FolderDelete Send DELETE request to delete a folder func (xs *XdsServer) FolderDelete(id string) error { - return xs.client.HTTPDelete("/folder/" + id) + return xs.client.HTTPDelete("/folders/" + id) } // FolderSync Send POST request to force synchronization of a folder func (xs *XdsServer) FolderSync(id string) error { - return xs.client.HTTPPost("/folder/sync/"+id, "") + return xs.client.HTTPPost("/folders/sync/"+id, "") +} + +// FolderUpdate Send PUT request to update a folder +func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error { + return xs.client.Put("/folders/"+fld.ID, fld, resFld) +} + +// CommandExec Send POST request to execute a command +func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error { + return xs.client.Post("/exec", args, res) +} + +// CommandSignal Send POST request to send a signal to a command +func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error { + return xs.client.Post("/signal", args, res) } // SetAPIRouterGroup . @@ -257,7 +204,7 @@ func (xs *XdsServer) PassthroughGet(url string) { nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) } // Send Get request - if err := xs._HTTPGet(nURL, &data); err != nil { + if err := xs.client.Get(nURL, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { xs.Connected = false xs._NotifyState() @@ -290,12 +237,20 @@ func (xs *XdsServer) PassthroughPost(url string) { if strings.Contains(url, ":") { nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) } + // Send Post request - response, err := xs._HTTPPost(nURL, bodyReq[:n]) + body, err := json.Marshal(bodyReq[:n]) + if err != nil { + common.APIError(c, err.Error()) + return + } + + response, err := xs.client.HTTPPostWithRes(nURL, string(body)) if err != nil { common.APIError(c, err.Error()) return } + bodyRes, err := ioutil.ReadAll(response.Body) if err != nil { common.APIError(c, "Cannot read response body") @@ -307,16 +262,26 @@ func (xs *XdsServer) PassthroughPost(url string) { // EventRegister Post a request to register to an XdsServer event func (xs *XdsServer) EventRegister(evName string, id string) error { - var err error - _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{ - Name: evName, - ProjectID: id, - }) - return err + return xs.client.Post( + "/events/register", + xsapiv1.EventRegisterArgs{ + Name: evName, + ProjectID: id, + }, + nil) +} + +// EventEmit Emit a event to XDS Server through WS +func (xs *XdsServer) EventEmit(message string, args ...interface{}) error { + if xs.ioSock == nil { + return fmt.Errorf("Io.Socket not initialized") + } + + return xs.ioSock.Emit(message, args...) } // EventOn Register a callback on events reception -func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { +func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) { if xs.ioSock == nil { return uuid.Nil, fmt.Errorf("Io.Socket not initialized") } @@ -328,19 +293,30 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err // Register listener only the first time evn := evName - // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange + // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg var err error - if evName == "event:FolderStateChanged" { - err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { + if evName == "event:folder-state-change" { + err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error { xs.sockEventsLock.Lock() - defer xs.sockEventsLock.Unlock() - for _, c := range xs.sockEvents[evn] { - c.Func(data) + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) } return nil }) } else { - err = xs.ioSock.On(evn, f) + err = xs.ioSock.On(evn, func(data interface{}) error { + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) } if err != nil { return uuid.Nil, err @@ -348,9 +324,10 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err } c := &caller{ - id: uuid.NewV1(), - EventName: evName, - Func: f, + id: uuid.NewV1(), + EventName: evName, + Func: f, + PrivateData: privData, } xs.sockEvents[evName] = append(xs.sockEvents[evName], c) @@ -379,23 +356,25 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { } // ProjectToFolder Convert Project structure to Folder structure -func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig { +func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig { stID := "" - if pPrj.Type == XdsTypeCloudSync { + if pPrj.Type == xsapiv1.TypeCloudSync { stID, _ = xs.SThg.IDGet() } - fPrj := XdsFolderConfig{ + // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/) + fPrj := xsapiv1.FolderConfig{ ID: pPrj.ID, Label: pPrj.Label, ClientPath: pPrj.ClientPath, - Type: XdsFolderType(pPrj.Type), + Type: xsapiv1.FolderType(pPrj.Type), Status: pPrj.Status, IsInSync: pPrj.IsInSync, DefaultSdk: pPrj.DefaultSdk, - DataPathMap: XdsPathMapConfig{ + ClientData: pPrj.ClientData, + DataPathMap: xsapiv1.PathMapConfig{ ServerPath: pPrj.ServerPath, }, - DataCloudSync: XdsCloudSyncConfig{ + DataCloudSync: xsapiv1.CloudSyncConfig{ SyncThingID: stID, STLocIsInSync: pPrj.IsInSync, STLocStatus: pPrj.Status, @@ -408,43 +387,71 @@ func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig } // FolderToProject Convert Folder structure to Project structure -func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig { +func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig { inSync := fPrj.IsInSync sts := fPrj.Status - if fPrj.Type == XdsTypeCloudSync { + if fPrj.Type == xsapiv1.TypeCloudSync { inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync sts = fPrj.DataCloudSync.STSvrStatus switch fPrj.DataCloudSync.STLocStatus { - case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause: + case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause: sts = fPrj.DataCloudSync.STLocStatus break - case apiv1.StatusSyncing: - if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause { - sts = apiv1.StatusSyncing + case xaapiv1.StatusSyncing: + if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause { + sts = xaapiv1.StatusSyncing } break - case apiv1.StatusEnable: + case xaapiv1.StatusEnable: // keep STSvrStatus break } } - pPrj := apiv1.ProjectConfig{ + pPrj := xaapiv1.ProjectConfig{ ID: fPrj.ID, ServerID: xs.ID, Label: fPrj.Label, ClientPath: fPrj.ClientPath, ServerPath: fPrj.DataPathMap.ServerPath, - Type: apiv1.ProjectType(fPrj.Type), + Type: xaapiv1.ProjectType(fPrj.Type), Status: sts, IsInSync: inSync, DefaultSdk: fPrj.DefaultSdk, + ClientData: fPrj.ClientData, } return pPrj } +// CommandAdd Add a new command to the list of running commands +func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error { + if xs.CommandGet(cmdID) != nil { + return fmt.Errorf("command id already exist") + } + xs.cmdList[cmdID] = data + return nil +} + +// CommandDelete Delete a command from the command list +func (xs *XdsServer) CommandDelete(cmdID string) error { + if xs.CommandGet(cmdID) == nil { + return fmt.Errorf("unknown command id") + } + delete(xs.cmdList, cmdID) + return nil +} + +// CommandGet Retrieve a command data +func (xs *XdsServer) CommandGet(cmdID string) interface{} { + d, exist := xs.cmdList[cmdID] + if exist { + return d + } + return nil +} + /*** ** Private functions ***/ @@ -478,24 +485,6 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } -// _HTTPGet . -func (xs *XdsServer) _HTTPGet(url string, data interface{}) error { - var dd []byte - if err := xs.client.HTTPGet(url, &dd); err != nil { - return err - } - return json.Unmarshal(dd, &data) -} - -// _HTTPPost . -func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) { - body, err := json.Marshal(data) - if err != nil { - return nil, err - } - return xs.client.HTTPPostWithRes(url, string(body)) -} - // Re-established connection func (xs *XdsServer) _reconnect() error { err := xs._connect(true) @@ -509,8 +498,8 @@ func (xs *XdsServer) _reconnect() error { // Established HTTP and WS connection and retrieve XDSServer config func (xs *XdsServer) _connect(reConn bool) error { - xdsCfg := XdsServerConfig{} - if err := xs._HTTPGet("/config", &xdsCfg); err != nil { + xdsCfg := xsapiv1.APIConfig{} + if err := xs.client.Get("/config", &xdsCfg); err != nil { xs.Connected = false if !reConn { xs._NotifyState() @@ -518,12 +507,12 @@ func (xs *XdsServer) _connect(reConn bool) error { return err } - if reConn && xs.ID != xdsCfg.ID { - xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID) + if reConn && xs.ID != xdsCfg.ServerUID { + xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID) } // Update local XDS config - xs.ID = xdsCfg.ID + xs.ID = xdsCfg.ServerUID xs.ServerConfig = &xdsCfg // Establish WS connection and register listen @@ -604,7 +593,7 @@ func (xs *XdsServer) _SocketConnect() error { // Send event to notify changes func (xs *XdsServer) _NotifyState() { - evSts := apiv1.ServerCfg{ + evSts := xaapiv1.ServerCfg{ ID: xs.ID, URL: xs.BaseURL, APIURL: xs.APIURL, @@ -612,7 +601,7 @@ func (xs *XdsServer) _NotifyState() { ConnRetry: xs.ConnRetry, Connected: xs.Connected, } - if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil { + if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil { xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) } }