+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package agent
import (
"time"
"github.com/gin-gonic/gin"
- "github.com/iotbzh/xds-agent/lib/apiv1"
+ "github.com/iotbzh/xds-agent/lib/xaapiv1"
"github.com/iotbzh/xds-agent/lib/xdsconfig"
common "github.com/iotbzh/xds-common/golib"
+ "github.com/iotbzh/xds-server/lib/xsapiv1"
uuid "github.com/satori/go.uuid"
sio_client "github.com/sebd71/go-socket.io-client"
)
ConnRetry int
Connected bool
Disabled bool
- ServerConfig *XdsServerConfig
+ ServerConfig *xsapiv1.APIConfig
// Events management
CBOnError func(error)
ioSock *sio_client.Client
logOut io.Writer
apiRouter *gin.RouterGroup
+ cmdList map[string]interface{}
}
-// XdsServerConfig Data return by GET /config
-type XdsServerConfig struct {
- ID string `json:"id"`
- Version string `json:"version"`
- APIVersion string `json:"apiVersion"`
- VersionGitTag string `json:"gitTag"`
- SupportedSharing map[string]bool `json:"supportedSharing"`
- Builder XdsBuilderConfig `json:"builder"`
-}
-
-// XdsBuilderConfig represents the builder container configuration
-type XdsBuilderConfig struct {
- IP string `json:"ip"`
- Port string `json:"port"`
- SyncThingID string `json:"syncThingID"`
-}
-
-// XdsFolderType XdsServer folder type
-type XdsFolderType string
-
-const (
- XdsTypePathMap = "PathMap"
- XdsTypeCloudSync = "CloudSync"
- XdsTypeCifsSmb = "CIFS"
-)
-
-// XdsFolderConfig XdsServer folder config
-type XdsFolderConfig struct {
- ID string `json:"id"`
- Label string `json:"label"`
- ClientPath string `json:"path"`
- Type XdsFolderType `json:"type"`
- Status string `json:"status"`
- IsInSync bool `json:"isInSync"`
- DefaultSdk string `json:"defaultSdk"`
- // Specific data depending on which Type is used
- DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
- DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
-}
-
-// XdsPathMapConfig Path mapping specific data
-type XdsPathMapConfig struct {
- ServerPath string `json:"serverPath"`
- CheckFile string `json:"checkFile"`
- CheckContent string `json:"checkContent"`
-}
-
-// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
-type XdsCloudSyncConfig struct {
- SyncThingID string `json:"syncThingID"`
- STSvrStatus string `json:"-"`
- STSvrIsInSync bool `json:"-"`
- STLocStatus string `json:"-"`
- STLocIsInSync bool `json:"-"`
-}
-
-// XdsEventRegisterArgs arguments used to register to XDS server events
-type XdsEventRegisterArgs struct {
- Name string `json:"name"`
- ProjectID string `json:"filterProjectID"`
-}
-
-// XdsEventFolderChange Folder change event structure
-type XdsEventFolderChange struct {
- Time string `json:"time"`
- Type string `json:"type"`
- Folder XdsFolderConfig `json:"folder"`
-}
+// EventCB Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
// caller Used to chain event listeners
type caller struct {
- id uuid.UUID
- EventName string
- Func func(interface{})
+ id uuid.UUID
+ EventName string
+ Func EventCB
+ PrivateData interface{}
}
const _IDTempoPrefix = "tempo-"
sockEvents: make(map[string][]*caller),
sockEventsLock: &sync.Mutex{},
logOut: ctx.Log.Out,
+ cmdList: make(map[string]interface{}),
}
}
}
// SendCommand Send a command to XDS Server
-func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
+func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
url := cmd
if !strings.HasPrefix("/", cmd) {
url = "/" + cmd
}
- return xs.client.HTTPPostWithRes(url, string(body))
+ return xs.client.Post(url, string(body), res)
}
// GetVersion Send Get request to retrieve XDS Server version
func (xs *XdsServer) GetVersion(res interface{}) error {
- return xs._HTTPGet("/version", &res)
+ return xs.client.Get("/version", &res)
}
// GetFolders Send GET request to get current folder configuration
-func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
- return xs._HTTPGet("/folders", folders)
+func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
+ return xs.client.Get("/folders", folders)
}
// FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
- response, err := xs._HTTPPost("/folders", fld)
+func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
+ err := xs.client.Post("/folders", fld, res)
if err != nil {
- return err
- }
- if response.StatusCode != 200 {
- return fmt.Errorf("FolderAdd error status=%s", response.Status)
+ return fmt.Errorf("FolderAdd error: %s", err.Error())
}
- // Result is a XdsFolderConfig that is equivalent to ProjectConfig
- err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
-
return err
}
return xs.client.HTTPPost("/folders/sync/"+id, "")
}
+// FolderUpdate Send PUT request to update a folder
+func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
+ return xs.client.Put("/folders/"+fld.ID, fld, resFld)
+}
+
+// CommandExec Send POST request to execute a command
+func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
+ return xs.client.Post("/exec", args, res)
+}
+
+// CommandSignal Send POST request to send a signal to a command
+func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
+ return xs.client.Post("/signal", args, res)
+}
+
// SetAPIRouterGroup .
func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
xs.apiRouter = r
nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
}
// Send Get request
- if err := xs._HTTPGet(nURL, &data); err != nil {
+ if err := xs.client.Get(nURL, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
xs.Connected = false
xs._NotifyState()
if strings.Contains(url, ":") {
nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
}
+
// Send Post request
- response, err := xs._HTTPPost(nURL, bodyReq[:n])
+ body, err := json.Marshal(bodyReq[:n])
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ response, err := xs.client.HTTPPostWithRes(nURL, string(body))
if err != nil {
common.APIError(c, err.Error())
return
}
+
bodyRes, err := ioutil.ReadAll(response.Body)
if err != nil {
common.APIError(c, "Cannot read response body")
// EventRegister Post a request to register to an XdsServer event
func (xs *XdsServer) EventRegister(evName string, id string) error {
- var err error
- _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
- Name: evName,
- ProjectID: id,
- })
- return err
+ return xs.client.Post(
+ "/events/register",
+ xsapiv1.EventRegisterArgs{
+ Name: evName,
+ ProjectID: id,
+ },
+ nil)
+}
+
+// EventEmit Emit a event to XDS Server through WS
+func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
+ if xs.ioSock == nil {
+ return fmt.Errorf("Io.Socket not initialized")
+ }
+
+ return xs.ioSock.Emit(message, args...)
}
// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
if xs.ioSock == nil {
return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
// Register listener only the first time
evn := evName
- // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
+ // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
var err error
- if evName == "event:FolderStateChanged" {
- err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
+ if evName == "event:folder-state-change" {
+ err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
xs.sockEventsLock.Lock()
- defer xs.sockEventsLock.Unlock()
- for _, c := range xs.sockEvents[evn] {
- c.Func(data)
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
}
return nil
})
} else {
- err = xs.ioSock.On(evn, f)
+ err = xs.ioSock.On(evn, func(data interface{}) error {
+ xs.sockEventsLock.Lock()
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
+ }
+ return nil
+ })
}
if err != nil {
return uuid.Nil, err
}
c := &caller{
- id: uuid.NewV1(),
- EventName: evName,
- Func: f,
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ PrivateData: privData,
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
}
// ProjectToFolder Convert Project structure to Folder structure
-func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
+func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
stID := ""
- if pPrj.Type == XdsTypeCloudSync {
+ if pPrj.Type == xsapiv1.TypeCloudSync {
stID, _ = xs.SThg.IDGet()
}
- fPrj := XdsFolderConfig{
+ // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
+ fPrj := xsapiv1.FolderConfig{
ID: pPrj.ID,
Label: pPrj.Label,
ClientPath: pPrj.ClientPath,
- Type: XdsFolderType(pPrj.Type),
+ Type: xsapiv1.FolderType(pPrj.Type),
Status: pPrj.Status,
IsInSync: pPrj.IsInSync,
DefaultSdk: pPrj.DefaultSdk,
- DataPathMap: XdsPathMapConfig{
+ ClientData: pPrj.ClientData,
+ DataPathMap: xsapiv1.PathMapConfig{
ServerPath: pPrj.ServerPath,
},
- DataCloudSync: XdsCloudSyncConfig{
+ DataCloudSync: xsapiv1.CloudSyncConfig{
SyncThingID: stID,
STLocIsInSync: pPrj.IsInSync,
STLocStatus: pPrj.Status,
}
// FolderToProject Convert Folder structure to Project structure
-func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
+func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
inSync := fPrj.IsInSync
sts := fPrj.Status
- if fPrj.Type == XdsTypeCloudSync {
+ if fPrj.Type == xsapiv1.TypeCloudSync {
inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
sts = fPrj.DataCloudSync.STSvrStatus
switch fPrj.DataCloudSync.STLocStatus {
- case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
+ case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
sts = fPrj.DataCloudSync.STLocStatus
break
- case apiv1.StatusSyncing:
- if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
- sts = apiv1.StatusSyncing
+ case xaapiv1.StatusSyncing:
+ if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
+ sts = xaapiv1.StatusSyncing
}
break
- case apiv1.StatusEnable:
+ case xaapiv1.StatusEnable:
// keep STSvrStatus
break
}
}
- pPrj := apiv1.ProjectConfig{
+ pPrj := xaapiv1.ProjectConfig{
ID: fPrj.ID,
ServerID: xs.ID,
Label: fPrj.Label,
ClientPath: fPrj.ClientPath,
ServerPath: fPrj.DataPathMap.ServerPath,
- Type: apiv1.ProjectType(fPrj.Type),
+ Type: xaapiv1.ProjectType(fPrj.Type),
Status: sts,
IsInSync: inSync,
DefaultSdk: fPrj.DefaultSdk,
+ ClientData: fPrj.ClientData,
}
return pPrj
}
+// CommandAdd Add a new command to the list of running commands
+func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
+ if xs.CommandGet(cmdID) != nil {
+ return fmt.Errorf("command id already exist")
+ }
+ xs.cmdList[cmdID] = data
+ return nil
+}
+
+// CommandDelete Delete a command from the command list
+func (xs *XdsServer) CommandDelete(cmdID string) error {
+ if xs.CommandGet(cmdID) == nil {
+ return fmt.Errorf("unknown command id")
+ }
+ delete(xs.cmdList, cmdID)
+ return nil
+}
+
+// CommandGet Retrieve a command data
+func (xs *XdsServer) CommandGet(cmdID string) interface{} {
+ d, exist := xs.cmdList[cmdID]
+ if exist {
+ return d
+ }
+ return nil
+}
+
/***
** Private functions
***/
return nil
}
-// _HTTPGet .
-func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
- var dd []byte
- if err := xs.client.HTTPGet(url, &dd); err != nil {
- return err
- }
- return json.Unmarshal(dd, &data)
-}
-
-// _HTTPPost .
-func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
- body, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- return xs.client.HTTPPostWithRes(url, string(body))
-}
-
// Re-established connection
func (xs *XdsServer) _reconnect() error {
err := xs._connect(true)
// Established HTTP and WS connection and retrieve XDSServer config
func (xs *XdsServer) _connect(reConn bool) error {
- xdsCfg := XdsServerConfig{}
- if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
+ xdsCfg := xsapiv1.APIConfig{}
+ if err := xs.client.Get("/config", &xdsCfg); err != nil {
xs.Connected = false
if !reConn {
xs._NotifyState()
return err
}
- if reConn && xs.ID != xdsCfg.ID {
- xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+ if reConn && xs.ID != xdsCfg.ServerUID {
+ xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
}
// Update local XDS config
- xs.ID = xdsCfg.ID
+ xs.ID = xdsCfg.ServerUID
xs.ServerConfig = &xdsCfg
// Establish WS connection and register listen
// Send event to notify changes
func (xs *XdsServer) _NotifyState() {
- evSts := apiv1.ServerCfg{
+ evSts := xaapiv1.ServerCfg{
ID: xs.ID,
URL: xs.BaseURL,
APIURL: xs.APIURL,
ConnRetry: xs.ConnRetry,
Connected: xs.Connected,
}
- if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
+ if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
}
}