Fixed /exec input stream and /signal.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
index c900c9e..bca4b66 100644 (file)
@@ -11,8 +11,10 @@ import (
        "time"
 
        "github.com/gin-gonic/gin"
+       "github.com/iotbzh/xds-agent/lib/xaapiv1"
        "github.com/iotbzh/xds-agent/lib/xdsconfig"
        common "github.com/iotbzh/xds-common/golib"
+       "github.com/iotbzh/xds-server/lib/xsapiv1"
        uuid "github.com/satori/go.uuid"
        sio_client "github.com/sebd71/go-socket.io-client"
 )
@@ -27,7 +29,7 @@ type XdsServer struct {
        ConnRetry    int
        Connected    bool
        Disabled     bool
-       ServerConfig *XdsServerConfig
+       ServerConfig *xsapiv1.APIConfig
 
        // Events management
        CBOnError      func(error)
@@ -40,80 +42,18 @@ type XdsServer struct {
        ioSock    *sio_client.Client
        logOut    io.Writer
        apiRouter *gin.RouterGroup
+       cmdList   map[string]interface{}
 }
 
-// XdsServerConfig Data return by GET /config
-type XdsServerConfig struct {
-       ID               string           `json:"id"`
-       Version          string           `json:"version"`
-       APIVersion       string           `json:"apiVersion"`
-       VersionGitTag    string           `json:"gitTag"`
-       SupportedSharing map[string]bool  `json:"supportedSharing"`
-       Builder          XdsBuilderConfig `json:"builder"`
-}
-
-// XdsBuilderConfig represents the builder container configuration
-type XdsBuilderConfig struct {
-       IP          string `json:"ip"`
-       Port        string `json:"port"`
-       SyncThingID string `json:"syncThingID"`
-}
-
-// XdsFolderType XdsServer folder type
-type XdsFolderType string
-
-const (
-       XdsTypePathMap   = "PathMap"
-       XdsTypeCloudSync = "CloudSync"
-       XdsTypeCifsSmb   = "CIFS"
-)
-
-// XdsFolderConfig XdsServer folder config
-type XdsFolderConfig struct {
-       ID         string        `json:"id"`
-       Label      string        `json:"label"`
-       ClientPath string        `json:"path"`
-       Type       XdsFolderType `json:"type"`
-       Status     string        `json:"status"`
-       IsInSync   bool          `json:"isInSync"`
-       DefaultSdk string        `json:"defaultSdk"`
-       // Specific data depending on which Type is used
-       DataPathMap   XdsPathMapConfig   `json:"dataPathMap,omitempty"`
-       DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
-}
-
-// XdsPathMapConfig Path mapping specific data
-type XdsPathMapConfig struct {
-       ServerPath string `json:"serverPath"`
-}
-
-// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
-type XdsCloudSyncConfig struct {
-       SyncThingID   string `json:"syncThingID"`
-       STSvrStatus   string `json:"-"`
-       STSvrIsInSync bool   `json:"-"`
-       STLocStatus   string `json:"-"`
-       STLocIsInSync bool   `json:"-"`
-}
-
-// XdsEventRegisterArgs arguments used to register to XDS server events
-type XdsEventRegisterArgs struct {
-       Name      string `json:"name"`
-       ProjectID string `json:"filterProjectID"`
-}
-
-// XdsEventFolderChange Folder change event structure
-type XdsEventFolderChange struct {
-       Time   string          `json:"time"`
-       Type   string          `json:"type"`
-       Folder XdsFolderConfig `json:"folder"`
-}
+// EventCB Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
 
 // caller Used to chain event listeners
 type caller struct {
-       id        uuid.UUID
-       EventName string
-       Func      func(interface{})
+       id          uuid.UUID
+       EventName   string
+       Func        EventCB
+       PrivateData interface{}
 }
 
 const _IDTempoPrefix = "tempo-"
@@ -133,6 +73,7 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
                sockEvents:     make(map[string][]*caller),
                sockEventsLock: &sync.Mutex{},
                logOut:         ctx.Log.Out,
+               cmdList:        make(map[string]interface{}),
        }
 }
 
@@ -191,47 +132,56 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
 }
 
 // SendCommand Send a command to XDS Server
-func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
+func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
        url := cmd
        if !strings.HasPrefix("/", cmd) {
                url = "/" + cmd
        }
-       return xs.client.HTTPPostWithRes(url, string(body))
+       return xs.client.Post(url, string(body), res)
 }
 
 // GetVersion Send Get request to retrieve XDS Server version
 func (xs *XdsServer) GetVersion(res interface{}) error {
-       return xs._HTTPGet("/version", &res)
+       return xs.client.Get("/version", &res)
 }
 
 // GetFolders Send GET request to get current folder configuration
-func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error {
-       return xs._HTTPGet("/folders", prjs)
+func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
+       return xs.client.Get("/folders", folders)
 }
 
 // FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error {
-       response, err := xs._HTTPPost("/folder", prj)
+func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
+       err := xs.client.Post("/folders", fld, res)
        if err != nil {
-               return err
+               return fmt.Errorf("FolderAdd error: %s", err.Error())
        }
-       if response.StatusCode != 200 {
-               return fmt.Errorf("FolderAdd error status=%s", response.Status)
-       }
-       // Result is a XdsFolderConfig that is equivalent to ProjectConfig
-       err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
-
        return err
 }
 
 // FolderDelete Send DELETE request to delete a folder
 func (xs *XdsServer) FolderDelete(id string) error {
-       return xs.client.HTTPDelete("/folder/" + id)
+       return xs.client.HTTPDelete("/folders/" + id)
 }
 
 // FolderSync Send POST request to force synchronization of a folder
 func (xs *XdsServer) FolderSync(id string) error {
-       return xs.client.HTTPPost("/folder/sync/"+id, "")
+       return xs.client.HTTPPost("/folders/sync/"+id, "")
+}
+
+// FolderUpdate Send PUT request to update a folder
+func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
+       return xs.client.Put("/folders/"+fld.ID, fld, resFld)
+}
+
+// CommandExec Send POST request to execute a command
+func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
+       return xs.client.Post("/exec", args, res)
+}
+
+// CommandSignal Send POST request to send a signal to a command
+func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
+       return xs.client.Post("/signal", args, res)
 }
 
 // SetAPIRouterGroup .
@@ -248,7 +198,13 @@ func (xs *XdsServer) PassthroughGet(url string) {
 
        xs.apiRouter.GET(url, func(c *gin.Context) {
                var data interface{}
-               if err := xs._HTTPGet(url, &data); err != nil {
+               // Take care of param (eg. id in /projects/:id)
+               nURL := url
+               if strings.Contains(url, ":") {
+                       nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+               }
+               // Send Get request
+               if err := xs.client.Get(nURL, &data); err != nil {
                        if strings.Contains(err.Error(), "connection refused") {
                                xs.Connected = false
                                xs._NotifyState()
@@ -276,11 +232,25 @@ func (xs *XdsServer) PassthroughPost(url string) {
                        return
                }
 
-               response, err := xs._HTTPPost(url, bodyReq[:n])
+               // Take care of param (eg. id in /projects/:id)
+               nURL := url
+               if strings.Contains(url, ":") {
+                       nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+               }
+
+               // Send Post request
+               body, err := json.Marshal(bodyReq[:n])
+               if err != nil {
+                       common.APIError(c, err.Error())
+                       return
+               }
+
+               response, err := xs.client.HTTPPostWithRes(nURL, string(body))
                if err != nil {
                        common.APIError(c, err.Error())
                        return
                }
+
                bodyRes, err := ioutil.ReadAll(response.Body)
                if err != nil {
                        common.APIError(c, "Cannot read response body")
@@ -292,16 +262,26 @@ func (xs *XdsServer) PassthroughPost(url string) {
 
 // EventRegister Post a request to register to an XdsServer event
 func (xs *XdsServer) EventRegister(evName string, id string) error {
-       var err error
-       _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
-               Name:      evName,
-               ProjectID: id,
-       })
-       return err
+       return xs.client.Post(
+               "/events/register",
+               xsapiv1.EventRegisterArgs{
+                       Name:      evName,
+                       ProjectID: id,
+               },
+               nil)
+}
+
+// EventEmit Emit a event to XDS Server through WS
+func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
+       if xs.ioSock == nil {
+               return fmt.Errorf("Io.Socket not initialized")
+       }
+
+       return xs.ioSock.Emit(message, args...)
 }
 
 // EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
        if xs.ioSock == nil {
                return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
        }
@@ -313,19 +293,30 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
                // Register listener only the first time
                evn := evName
 
-               // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
+               // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
                var err error
-               if evName == "event:FolderStateChanged" {
-                       err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
+               if evName == "event:folder-state-change" {
+                       err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
                                xs.sockEventsLock.Lock()
-                               defer xs.sockEventsLock.Unlock()
-                               for _, c := range xs.sockEvents[evn] {
-                                       c.Func(data)
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
                                }
                                return nil
                        })
                } else {
-                       err = xs.ioSock.On(evn, f)
+                       err = xs.ioSock.On(evn, func(data interface{}) error {
+                               xs.sockEventsLock.Lock()
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
+                               }
+                               return nil
+                       })
                }
                if err != nil {
                        return uuid.Nil, err
@@ -333,9 +324,10 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
        }
 
        c := &caller{
-               id:        uuid.NewV1(),
-               EventName: evName,
-               Func:      f,
+               id:          uuid.NewV1(),
+               EventName:   evName,
+               Func:        f,
+               PrivateData: privData,
        }
 
        xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
@@ -364,23 +356,25 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
 }
 
 // ProjectToFolder Convert Project structure to Folder structure
-func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
+func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
        stID := ""
-       if pPrj.Type == XdsTypeCloudSync {
+       if pPrj.Type == xsapiv1.TypeCloudSync {
                stID, _ = xs.SThg.IDGet()
        }
-       fPrj := XdsFolderConfig{
+       // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
+       fPrj := xsapiv1.FolderConfig{
                ID:         pPrj.ID,
                Label:      pPrj.Label,
                ClientPath: pPrj.ClientPath,
-               Type:       XdsFolderType(pPrj.Type),
+               Type:       xsapiv1.FolderType(pPrj.Type),
                Status:     pPrj.Status,
                IsInSync:   pPrj.IsInSync,
                DefaultSdk: pPrj.DefaultSdk,
-               DataPathMap: XdsPathMapConfig{
+               ClientData: pPrj.ClientData,
+               DataPathMap: xsapiv1.PathMapConfig{
                        ServerPath: pPrj.ServerPath,
                },
-               DataCloudSync: XdsCloudSyncConfig{
+               DataCloudSync: xsapiv1.CloudSyncConfig{
                        SyncThingID:   stID,
                        STLocIsInSync: pPrj.IsInSync,
                        STLocStatus:   pPrj.Status,
@@ -393,43 +387,71 @@ func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
 }
 
 // FolderToProject Convert Folder structure to Project structure
-func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig {
+func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
        inSync := fPrj.IsInSync
        sts := fPrj.Status
 
-       if fPrj.Type == XdsTypeCloudSync {
+       if fPrj.Type == xsapiv1.TypeCloudSync {
                inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
 
                sts = fPrj.DataCloudSync.STSvrStatus
                switch fPrj.DataCloudSync.STLocStatus {
-               case StatusErrorConfig, StatusDisable, StatusPause:
+               case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
                        sts = fPrj.DataCloudSync.STLocStatus
                        break
-               case StatusSyncing:
-                       if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause {
-                               sts = StatusSyncing
+               case xaapiv1.StatusSyncing:
+                       if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
+                               sts = xaapiv1.StatusSyncing
                        }
                        break
-               case StatusEnable:
+               case xaapiv1.StatusEnable:
                        // keep STSvrStatus
                        break
                }
        }
 
-       pPrj := ProjectConfig{
+       pPrj := xaapiv1.ProjectConfig{
                ID:         fPrj.ID,
                ServerID:   xs.ID,
                Label:      fPrj.Label,
                ClientPath: fPrj.ClientPath,
                ServerPath: fPrj.DataPathMap.ServerPath,
-               Type:       ProjectType(fPrj.Type),
+               Type:       xaapiv1.ProjectType(fPrj.Type),
                Status:     sts,
                IsInSync:   inSync,
                DefaultSdk: fPrj.DefaultSdk,
+               ClientData: fPrj.ClientData,
        }
        return pPrj
 }
 
+// CommandAdd Add a new command to the list of running commands
+func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
+       if xs.CommandGet(cmdID) != nil {
+               return fmt.Errorf("command id already exist")
+       }
+       xs.cmdList[cmdID] = data
+       return nil
+}
+
+// CommandDelete Delete a command from the command list
+func (xs *XdsServer) CommandDelete(cmdID string) error {
+       if xs.CommandGet(cmdID) == nil {
+               return fmt.Errorf("unknown command id")
+       }
+       delete(xs.cmdList, cmdID)
+       return nil
+}
+
+// CommandGet Retrieve a command data
+func (xs *XdsServer) CommandGet(cmdID string) interface{} {
+       d, exist := xs.cmdList[cmdID]
+       if exist {
+               return d
+       }
+       return nil
+}
+
 /***
 ** Private functions
 ***/
@@ -463,24 +485,6 @@ func (xs *XdsServer) _CreateConnectHTTP() error {
        return nil
 }
 
-// _HTTPGet .
-func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
-       var dd []byte
-       if err := xs.client.HTTPGet(url, &dd); err != nil {
-               return err
-       }
-       return json.Unmarshal(dd, &data)
-}
-
-// _HTTPPost .
-func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
-       body, err := json.Marshal(data)
-       if err != nil {
-               return nil, err
-       }
-       return xs.client.HTTPPostWithRes(url, string(body))
-}
-
 //  Re-established connection
 func (xs *XdsServer) _reconnect() error {
        err := xs._connect(true)
@@ -494,8 +498,8 @@ func (xs *XdsServer) _reconnect() error {
 //  Established HTTP and WS connection and retrieve XDSServer config
 func (xs *XdsServer) _connect(reConn bool) error {
 
-       xdsCfg := XdsServerConfig{}
-       if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
+       xdsCfg := xsapiv1.APIConfig{}
+       if err := xs.client.Get("/config", &xdsCfg); err != nil {
                xs.Connected = false
                if !reConn {
                        xs._NotifyState()
@@ -503,12 +507,12 @@ func (xs *XdsServer) _connect(reConn bool) error {
                return err
        }
 
-       if reConn && xs.ID != xdsCfg.ID {
-               xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+       if reConn && xs.ID != xdsCfg.ServerUID {
+               xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
        }
 
        // Update local XDS config
-       xs.ID = xdsCfg.ID
+       xs.ID = xdsCfg.ServerUID
        xs.ServerConfig = &xdsCfg
 
        // Establish WS connection and register listen
@@ -589,7 +593,7 @@ func (xs *XdsServer) _SocketConnect() error {
 // Send event to notify changes
 func (xs *XdsServer) _NotifyState() {
 
-       evSts := ServerCfg{
+       evSts := xaapiv1.ServerCfg{
                ID:         xs.ID,
                URL:        xs.BaseURL,
                APIURL:     xs.APIURL,
@@ -597,7 +601,7 @@ func (xs *XdsServer) _NotifyState() {
                ConnRetry:  xs.ConnRetry,
                Connected:  xs.Connected,
        }
-       if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
+       if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
                xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
        }
 }