Fixed /exec input stream and /signal.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
index 518c68b..bca4b66 100644 (file)
@@ -7,16 +7,19 @@ import (
        "io/ioutil"
        "net/http"
        "strings"
+       "sync"
        "time"
 
        "github.com/gin-gonic/gin"
+       "github.com/iotbzh/xds-agent/lib/xaapiv1"
        "github.com/iotbzh/xds-agent/lib/xdsconfig"
        common "github.com/iotbzh/xds-common/golib"
+       "github.com/iotbzh/xds-server/lib/xsapiv1"
        uuid "github.com/satori/go.uuid"
        sio_client "github.com/sebd71/go-socket.io-client"
 )
 
-// Server .
+// XdsServer .
 type XdsServer struct {
        *Context
        ID           string
@@ -26,67 +29,31 @@ type XdsServer struct {
        ConnRetry    int
        Connected    bool
        Disabled     bool
-       ServerConfig *xdsServerConfig
+       ServerConfig *xsapiv1.APIConfig
 
-       // callbacks
+       // Events management
        CBOnError      func(error)
        CBOnDisconnect func(error)
+       sockEvents     map[string][]*caller
+       sockEventsLock *sync.Mutex
 
        // Private fields
        client    *common.HTTPClient
        ioSock    *sio_client.Client
        logOut    io.Writer
        apiRouter *gin.RouterGroup
+       cmdList   map[string]interface{}
 }
 
-// xdsServerConfig Data return by GET /config
-type xdsServerConfig struct {
-       ID               string           `json:"id"`
-       Version          string           `json:"version"`
-       APIVersion       string           `json:"apiVersion"`
-       VersionGitTag    string           `json:"gitTag"`
-       SupportedSharing map[string]bool  `json:"supportedSharing"`
-       Builder          xdsBuilderConfig `json:"builder"`
-}
-
-// xdsBuilderConfig represents the builder container configuration
-type xdsBuilderConfig struct {
-       IP          string `json:"ip"`
-       Port        string `json:"port"`
-       SyncThingID string `json:"syncThingID"`
-}
-
-// FolderType XdsServer folder type
-type FolderType string
-
-const (
-       XdsTypePathMap   = "PathMap"
-       XdsTypeCloudSync = "CloudSync"
-       XdsTypeCifsSmb   = "CIFS"
-)
+// EventCB Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
 
-// FolderConfig XdsServer folder config
-type FolderConfig struct {
-       ID         string     `json:"id"`
-       Label      string     `json:"label"`
-       ClientPath string     `json:"path"`
-       Type       FolderType `json:"type"`
-       Status     string     `json:"status"`
-       IsInSync   bool       `json:"isInSync"`
-       DefaultSdk string     `json:"defaultSdk"`
-       // Specific data depending on which Type is used
-       DataPathMap   PathMapConfig   `json:"dataPathMap,omitempty"`
-       DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
-}
-
-// PathMapConfig Path mapping specific data
-type PathMapConfig struct {
-       ServerPath string `json:"serverPath"`
-}
-
-// CloudSyncConfig CloudSync (AKA Syncthing) specific data
-type CloudSyncConfig struct {
-       SyncThingID string `json:"syncThingID"`
+// caller Used to chain event listeners
+type caller struct {
+       id          uuid.UUID
+       EventName   string
+       Func        EventCB
+       PrivateData interface{}
 }
 
 const _IDTempoPrefix = "tempo-"
@@ -103,7 +70,10 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
                Connected:  false,
                Disabled:   false,
 
-               logOut: ctx.Log.Out,
+               sockEvents:     make(map[string][]*caller),
+               sockEventsLock: &sync.Mutex{},
+               logOut:         ctx.Log.Out,
+               cmdList:        make(map[string]interface{}),
        }
 }
 
@@ -138,7 +108,7 @@ func (xs *XdsServer) Connect() error {
                time.Sleep(time.Second)
        }
        if retry == 0 {
-               // FIXME SEB: re-use _reconnect to wait longer in background
+               // FIXME: re-use _reconnect to wait longer in background
                return fmt.Errorf("Connection to XDS Server failure")
        }
        if err != nil {
@@ -161,47 +131,57 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
        xs.logOut = out
 }
 
+// SendCommand Send a command to XDS Server
+func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
+       url := cmd
+       if !strings.HasPrefix("/", cmd) {
+               url = "/" + cmd
+       }
+       return xs.client.Post(url, string(body), res)
+}
+
+// GetVersion Send Get request to retrieve XDS Server version
+func (xs *XdsServer) GetVersion(res interface{}) error {
+       return xs.client.Get("/version", &res)
+}
+
+// GetFolders Send GET request to get current folder configuration
+func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
+       return xs.client.Get("/folders", folders)
+}
+
 // FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
-       response, err := xs.HTTPPost("/folder", prj)
+func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
+       err := xs.client.Post("/folders", fld, res)
        if err != nil {
-               return err
-       }
-       if response.StatusCode != 200 {
-               return fmt.Errorf("FolderAdd error status=%s", response.Status)
+               return fmt.Errorf("FolderAdd error: %s", err.Error())
        }
-       // Result is a FolderConfig that is equivalent to ProjectConfig
-       err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
-
        return err
 }
 
 // FolderDelete Send DELETE request to delete a folder
 func (xs *XdsServer) FolderDelete(id string) error {
-       return xs.client.HTTPDelete("/folder/" + id)
+       return xs.client.HTTPDelete("/folders/" + id)
 }
 
-// HTTPGet .
-func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
-       var dd []byte
-       if err := xs.client.HTTPGet(url, &dd); err != nil {
-               return err
-       }
-       return json.Unmarshal(dd, &data)
+// FolderSync Send POST request to force synchronization of a folder
+func (xs *XdsServer) FolderSync(id string) error {
+       return xs.client.HTTPPost("/folders/sync/"+id, "")
 }
 
-// HTTPPost .
-func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
-       body, err := json.Marshal(data)
-       if err != nil {
-               return nil, err
-       }
-       return xs.HTTPPostBody(url, string(body))
+// FolderUpdate Send PUT request to update a folder
+func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
+       return xs.client.Put("/folders/"+fld.ID, fld, resFld)
+}
+
+// CommandExec Send POST request to execute a command
+func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
+       return xs.client.Post("/exec", args, res)
 }
 
-// HTTPPostBody .
-func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
-       return xs.client.HTTPPostWithRes(url, body)
+// CommandSignal Send POST request to send a signal to a command
+func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
+       return xs.client.Post("/signal", args, res)
 }
 
 // SetAPIRouterGroup .
@@ -218,7 +198,13 @@ func (xs *XdsServer) PassthroughGet(url string) {
 
        xs.apiRouter.GET(url, func(c *gin.Context) {
                var data interface{}
-               if err := xs.HTTPGet(url, &data); err != nil {
+               // Take care of param (eg. id in /projects/:id)
+               nURL := url
+               if strings.Contains(url, ":") {
+                       nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+               }
+               // Send Get request
+               if err := xs.client.Get(nURL, &data); err != nil {
                        if strings.Contains(err.Error(), "connection refused") {
                                xs.Connected = false
                                xs._NotifyState()
@@ -246,11 +232,25 @@ func (xs *XdsServer) PassthroughPost(url string) {
                        return
                }
 
-               response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+               // Take care of param (eg. id in /projects/:id)
+               nURL := url
+               if strings.Contains(url, ":") {
+                       nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+               }
+
+               // Send Post request
+               body, err := json.Marshal(bodyReq[:n])
                if err != nil {
                        common.APIError(c, err.Error())
                        return
                }
+
+               response, err := xs.client.HTTPPostWithRes(nURL, string(body))
+               if err != nil {
+                       common.APIError(c, err.Error())
+                       return
+               }
+
                bodyRes, err := ioutil.ReadAll(response.Body)
                if err != nil {
                        common.APIError(c, "Cannot read response body")
@@ -260,63 +260,198 @@ func (xs *XdsServer) PassthroughPost(url string) {
        })
 }
 
-// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+// EventRegister Post a request to register to an XdsServer event
+func (xs *XdsServer) EventRegister(evName string, id string) error {
+       return xs.client.Post(
+               "/events/register",
+               xsapiv1.EventRegisterArgs{
+                       Name:      evName,
+                       ProjectID: id,
+               },
+               nil)
+}
+
+// EventEmit Emit a event to XDS Server through WS
+func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
        if xs.ioSock == nil {
                return fmt.Errorf("Io.Socket not initialized")
        }
-       // FIXME SEB: support chain / multiple listeners
-       /*      sockEvents     map[string][]*caller
+
+       return xs.ioSock.Emit(message, args...)
+}
+
+// EventOn Register a callback on events reception
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
+       if xs.ioSock == nil {
+               return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
+       }
+
        xs.sockEventsLock.Lock()
-       xs.sockEvents[message] = append(xs.sockEvents[message], f)
-       xs.sockEventsLock.Unlock()
-       xs.ioSock.On(message, func(ev) {
+       defer xs.sockEventsLock.Unlock()
+
+       if _, exist := xs.sockEvents[evName]; !exist {
+               // Register listener only the first time
+               evn := evName
+
+               // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
+               var err error
+               if evName == "event:folder-state-change" {
+                       err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
+                               xs.sockEventsLock.Lock()
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
+                               }
+                               return nil
+                       })
+               } else {
+                       err = xs.ioSock.On(evn, func(data interface{}) error {
+                               xs.sockEventsLock.Lock()
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
+                               }
+                               return nil
+                       })
+               }
+               if err != nil {
+                       return uuid.Nil, err
+               }
+       }
 
-       })
-       */
-       return xs.ioSock.On(message, f)
+       c := &caller{
+               id:          uuid.NewV1(),
+               EventName:   evName,
+               Func:        f,
+               PrivateData: privData,
+       }
+
+       xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+       return c.id, nil
+}
+
+// EventOff Un-register a (or all) callbacks associated to an event
+func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
+       xs.sockEventsLock.Lock()
+       defer xs.sockEventsLock.Unlock()
+       if _, exist := xs.sockEvents[evName]; exist {
+               if id == uuid.Nil {
+                       // Un-register all
+                       xs.sockEvents[evName] = []*caller{}
+               } else {
+                       // Un-register only the specified callback
+                       for i, ff := range xs.sockEvents[evName] {
+                               if uuid.Equal(ff.id, id) {
+                                       xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
+                                       break
+                               }
+                       }
+               }
+       }
+       return nil
 }
 
-// ProjectToFolder
-func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+// ProjectToFolder Convert Project structure to Folder structure
+func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
        stID := ""
-       if pPrj.Type == XdsTypeCloudSync {
+       if pPrj.Type == xsapiv1.TypeCloudSync {
                stID, _ = xs.SThg.IDGet()
        }
-       fPrj := FolderConfig{
+       // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
+       fPrj := xsapiv1.FolderConfig{
                ID:         pPrj.ID,
                Label:      pPrj.Label,
                ClientPath: pPrj.ClientPath,
-               Type:       FolderType(pPrj.Type),
+               Type:       xsapiv1.FolderType(pPrj.Type),
                Status:     pPrj.Status,
                IsInSync:   pPrj.IsInSync,
                DefaultSdk: pPrj.DefaultSdk,
-               DataPathMap: PathMapConfig{
+               ClientData: pPrj.ClientData,
+               DataPathMap: xsapiv1.PathMapConfig{
                        ServerPath: pPrj.ServerPath,
                },
-               DataCloudSync: CloudSyncConfig{
-                       SyncThingID: stID,
+               DataCloudSync: xsapiv1.CloudSyncConfig{
+                       SyncThingID:   stID,
+                       STLocIsInSync: pPrj.IsInSync,
+                       STLocStatus:   pPrj.Status,
+                       STSvrIsInSync: pPrj.IsInSync,
+                       STSvrStatus:   pPrj.Status,
                },
        }
+
        return &fPrj
 }
 
-// FolderToProject
-func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
-       pPrj := ProjectConfig{
+// FolderToProject Convert Folder structure to Project structure
+func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
+       inSync := fPrj.IsInSync
+       sts := fPrj.Status
+
+       if fPrj.Type == xsapiv1.TypeCloudSync {
+               inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
+
+               sts = fPrj.DataCloudSync.STSvrStatus
+               switch fPrj.DataCloudSync.STLocStatus {
+               case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
+                       sts = fPrj.DataCloudSync.STLocStatus
+                       break
+               case xaapiv1.StatusSyncing:
+                       if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
+                               sts = xaapiv1.StatusSyncing
+                       }
+                       break
+               case xaapiv1.StatusEnable:
+                       // keep STSvrStatus
+                       break
+               }
+       }
+
+       pPrj := xaapiv1.ProjectConfig{
                ID:         fPrj.ID,
                ServerID:   xs.ID,
                Label:      fPrj.Label,
                ClientPath: fPrj.ClientPath,
                ServerPath: fPrj.DataPathMap.ServerPath,
-               Type:       ProjectType(fPrj.Type),
-               Status:     fPrj.Status,
-               IsInSync:   fPrj.IsInSync,
+               Type:       xaapiv1.ProjectType(fPrj.Type),
+               Status:     sts,
+               IsInSync:   inSync,
                DefaultSdk: fPrj.DefaultSdk,
+               ClientData: fPrj.ClientData,
        }
        return pPrj
 }
 
+// CommandAdd Add a new command to the list of running commands
+func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
+       if xs.CommandGet(cmdID) != nil {
+               return fmt.Errorf("command id already exist")
+       }
+       xs.cmdList[cmdID] = data
+       return nil
+}
+
+// CommandDelete Delete a command from the command list
+func (xs *XdsServer) CommandDelete(cmdID string) error {
+       if xs.CommandGet(cmdID) == nil {
+               return fmt.Errorf("unknown command id")
+       }
+       delete(xs.cmdList, cmdID)
+       return nil
+}
+
+// CommandGet Retrieve a command data
+func (xs *XdsServer) CommandGet(cmdID string) interface{} {
+       d, exist := xs.cmdList[cmdID]
+       if exist {
+               return d
+       }
+       return nil
+}
+
 /***
 ** Private functions
 ***/
@@ -363,8 +498,8 @@ func (xs *XdsServer) _reconnect() error {
 //  Established HTTP and WS connection and retrieve XDSServer config
 func (xs *XdsServer) _connect(reConn bool) error {
 
-       xdsCfg := xdsServerConfig{}
-       if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+       xdsCfg := xsapiv1.APIConfig{}
+       if err := xs.client.Get("/config", &xdsCfg); err != nil {
                xs.Connected = false
                if !reConn {
                        xs._NotifyState()
@@ -372,12 +507,12 @@ func (xs *XdsServer) _connect(reConn bool) error {
                return err
        }
 
-       if reConn && xs.ID != xdsCfg.ID {
-               xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+       if reConn && xs.ID != xdsCfg.ServerUID {
+               xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
        }
 
        // Update local XDS config
-       xs.ID = xdsCfg.ID
+       xs.ID = xdsCfg.ServerUID
        xs.ServerConfig = &xdsCfg
 
        // Establish WS connection and register listen
@@ -458,7 +593,7 @@ func (xs *XdsServer) _SocketConnect() error {
 // Send event to notify changes
 func (xs *XdsServer) _NotifyState() {
 
-       evSts := ServerCfg{
+       evSts := xaapiv1.ServerCfg{
                ID:         xs.ID,
                URL:        xs.BaseURL,
                APIURL:     xs.APIURL,
@@ -466,7 +601,7 @@ func (xs *XdsServer) _NotifyState() {
                ConnRetry:  xs.ConnRetry,
                Connected:  xs.Connected,
        }
-       if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
+       if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
                xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
        }
 }