Wait folder insync before sending exit event.
authorSebastien Douheret <sebastien.douheret@iot.bzh>
Sat, 27 May 2017 21:10:33 +0000 (23:10 +0200)
committerSebastien Douheret <sebastien.douheret@iot.bzh>
Sat, 27 May 2017 21:10:33 +0000 (23:10 +0200)
By default wait folder insync.
Set ExitImmediate param to true to send exit event immedialty without waiting folder synchronization.

lib/apiv1/exec.go
lib/apiv1/make.go
lib/common/execPipeWs.go
lib/model/folder.go
lib/syncthing/st.go

index 895807d..675f6fb 100644 (file)
@@ -12,13 +12,14 @@ import (
 
 // ExecArgs JSON parameters of /exec command
 type ExecArgs struct {
-       ID         string   `json:"id" binding:"required"`
-       SdkID      string   `json:"sdkid"` // sdk ID to use for setting env
-       Cmd        string   `json:"cmd" binding:"required"`
-       Args       []string `json:"args"`
-       Env        []string `json:"env"`
-       RPath      string   `json:"rpath"`   // relative path into project
-       CmdTimeout int      `json:"timeout"` // command completion timeout in Second
+       ID            string   `json:"id" binding:"required"`
+       SdkID         string   `json:"sdkid"` // sdk ID to use for setting env
+       Cmd           string   `json:"cmd" binding:"required"`
+       Args          []string `json:"args"`
+       Env           []string `json:"env"`
+       RPath         string   `json:"rpath"`         // relative path into project
+       ExitImmediate bool     `json:"exitImmediate"` // when true, exit event sent immediately when command exited (IOW, don't wait file synchronization)
+       CmdTimeout    int      `json:"timeout"`       // command completion timeout in Second
 }
 
 // ExecOutMsg Message send on each output (stdout+stderr) of executed command
@@ -122,7 +123,7 @@ func (s *APIService) execCmd(c *gin.Context) {
        }
 
        // Define callback for output
-       eCB := func(sid string, id int, code int, err error) {
+       eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) {
                s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err)
 
                // IO socket can be nil when disconnected
@@ -132,6 +133,30 @@ func (s *APIService) execCmd(c *gin.Context) {
                        return
                }
 
+               // Retrieve project ID and RootPath
+               prjID := (*data)["ID"].(string)
+               exitImm := (*data)["ExitImmediate"].(bool)
+
+               // XXX - workaround to be sure that Syncthing detected all changes
+               if err := s.mfolder.ForceSync(prjID); err != nil {
+                       s.log.Errorf("Error while syncing folder %s: %v", prjID, err)
+               }
+               if !exitImm {
+                       // Wait end of file sync
+                       // FIXME pass as argument
+                       tmo := 60
+                       for t := tmo; t > 0; t-- {
+                               s.log.Debugf("Wait file insync for %s (%d/%d)", prjID, t, tmo)
+                               if sync, err := s.mfolder.IsFolderInSync(prjID); sync || err != nil {
+                                       if err != nil {
+                                               s.log.Errorf("ERROR IsFolderInSync (%s): %v", prjID, err)
+                                       }
+                                       break
+                               }
+                               time.Sleep(time.Second)
+                       }
+               }
+
                // FIXME replace by .BroadcastTo a room
                e := (*so).Emit(ExecExitEvent, ExecExitMsg{
                        CmdID:     strconv.Itoa(id),
@@ -164,6 +189,7 @@ func (s *APIService) execCmd(c *gin.Context) {
        data := make(map[string]interface{})
        data["ID"] = prj.ID
        data["RootPath"] = prj.RootPath
+       data["ExitImmediate"] = args.ExitImmediate
 
        err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, &data)
        if err != nil {
index 098e41c..d015d2b 100644 (file)
@@ -2,6 +2,7 @@ package apiv1
 
 import (
        "net/http"
+       "strings"
 
        "time"
 
@@ -95,7 +96,16 @@ func (s *APIService) buildMake(c *gin.Context) {
                        s.log.Infof("%s not emitted: WS closed - sid: %s - msg id:%d", MakeOutEvent, sid, id)
                        return
                }
-               s.log.Debugf("%s emitted - WS sid %s - id:%d", MakeOutEvent, sid, id)
+
+               // Retrieve project ID and RootPath
+               prjID := (*data)["ID"].(string)
+               prjRootPath := (*data)["RootPath"].(string)
+
+               // Cleanup any references to internal rootpath in stdout & stderr
+               stdout = strings.Replace(stdout, prjRootPath, "", -1)
+               stderr = strings.Replace(stderr, prjRootPath, "", -1)
+
+               s.log.Debugf("%s emitted - WS sid %s - id:%d - prjID:%s", MakeOutEvent, sid, id, prjID)
 
                // FIXME replace by .BroadcastTo a room
                err := (*so).Emit(MakeOutEvent, MakeOutMsg{
@@ -110,7 +120,7 @@ func (s *APIService) buildMake(c *gin.Context) {
        }
 
        // Define callback for output
-       eCB := func(sid string, id int, code int, err error) {
+       eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) {
                s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err)
 
                // IO socket can be nil when disconnected
@@ -120,6 +130,14 @@ func (s *APIService) buildMake(c *gin.Context) {
                        return
                }
 
+               // Retrieve project ID and RootPath
+               prjID := (*data)["ID"].(string)
+
+               // XXX - workaround to be sure that Syncthing detected all changes
+               if err := s.mfolder.ForceSync(prjID); err != nil {
+                       s.log.Errorf("Error while syncing folder %s: %v", prjID, err)
+               }
+
                // FIXME replace by .BroadcastTo a room
                e := (*so).Emit(MakeExitEvent, MakeExitMsg{
                        CmdID:     strconv.Itoa(id),
@@ -148,6 +166,11 @@ func (s *APIService) buildMake(c *gin.Context) {
        }
 
        s.log.Debugf("Execute [Cmd ID %d]: %v", cmdID, cmd)
+
+       data := make(map[string]interface{})
+       data["ID"] = prj.ID
+       data["RootPath"] = prj.RootPath
+
        err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, nil)
        if err != nil {
                common.APIError(c, err.Error())
index 4994d9d..9bb4517 100644 (file)
@@ -18,7 +18,7 @@ import (
 type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})
 
 // EmitExitCB is the function callback used to emit exit proc code
-type EmitExitCB func(sid string, cmdID int, code int, err error)
+type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{})
 
 // Inspired by :
 // https://github.com/gorilla/websocket/blob/master/examples/command/main.go
@@ -63,7 +63,7 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd
                go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)
 
                // Blocking function that poll input or wait for end of process
-               cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB)
+               cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data)
 
                // Some commands will exit when stdin is closed.
                inw.Close()
@@ -94,7 +94,8 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd
 }
 
 func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
-       sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) {
+       sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB,
+       data *map[string]interface{}) {
        /* XXX - code to add to support stdin through WS
        for {
                _, message, err := so. ?? ReadMessage()
@@ -127,10 +128,10 @@ func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
        // Wait cmd complete
        select {
        case dC := <-done:
-               exitFuncCB(sid, cmdID, dC.status, dC.err)
+               exitFuncCB(sid, cmdID, dC.status, dC.err, data)
        case <-time.After(time.Duration(tmo) * time.Second):
                exitFuncCB(sid, cmdID, -99,
-                       fmt.Errorf("Exit Timeout for command ID %v", cmdID))
+                       fmt.Errorf("Exit Timeout for command ID %v", cmdID), data)
        }
 }
 
index e461f9c..fa94409 100644 (file)
@@ -98,3 +98,13 @@ func (c *Folder) DeleteFolder(id string) (xdsconfig.FolderConfig, error) {
 
        return fld, err
 }
+
+// ForceSync Force the synchronization of a folder
+func (c *Folder) ForceSync(id string) error {
+       return c.SThg.FolderScan(id, "")
+}
+
+// IsFolderInSync Returns true when folder is in sync
+func (c *Folder) IsFolderInSync(id string) (bool, error) {
+       return c.SThg.IsFolderInSync(id)
+}
index 957dd65..75bdf80 100644 (file)
@@ -49,6 +49,42 @@ type ExitChan struct {
        err    error
 }
 
+// ConfigInSync Check whether if Syncthing configuration is in sync
+type configInSync struct {
+       ConfigInSync bool `json:"configInSync"`
+}
+
+// FolderStatus Information about the current status of a folder.
+type FolderStatus struct {
+       GlobalFiles       int   `json:"globalFiles"`
+       GlobalDirectories int   `json:"globalDirectories"`
+       GlobalSymlinks    int   `json:"globalSymlinks"`
+       GlobalDeleted     int   `json:"globalDeleted"`
+       GlobalBytes       int64 `json:"globalBytes"`
+
+       LocalFiles       int   `json:"localFiles"`
+       LocalDirectories int   `json:"localDirectories"`
+       LocalSymlinks    int   `json:"localSymlinks"`
+       LocalDeleted     int   `json:"localDeleted"`
+       LocalBytes       int64 `json:"localBytes"`
+
+       NeedFiles       int   `json:"needFiles"`
+       NeedDirectories int   `json:"needDirectories"`
+       NeedSymlinks    int   `json:"needSymlinks"`
+       NeedDeletes     int   `json:"needDeletes"`
+       NeedBytes       int64 `json:"needBytes"`
+
+       InSyncFiles int   `json:"inSyncFiles"`
+       InSyncBytes int64 `json:"inSyncBytes"`
+
+       State        string    `json:"state"`
+       StateChanged time.Time `json:"stateChanged"`
+
+       Sequence int64 `json:"sequence"`
+
+       IgnorePatterns bool `json:"ignorePatterns"`
+}
+
 // NewSyncThing creates a new instance of Syncthing
 func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing {
        var url, apiKey, home, binDir string
@@ -309,3 +345,57 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error {
        }
        return s.client.HTTPPost("system/config", string(body))
 }
+
+// IsConfigInSync Returns true if configuration is in sync
+func (s *SyncThing) IsConfigInSync() (bool, error) {
+       var data []byte
+       var d configInSync
+       if err := s.client.HTTPGet("system/config/insync", &data); err != nil {
+               return false, err
+       }
+       if err := json.Unmarshal(data, &d); err != nil {
+               return false, err
+       }
+       return d.ConfigInSync, nil
+}
+
+// FolderStatus Returns all information about the current
+func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) {
+       var data []byte
+       var res FolderStatus
+       if folderID == "" {
+               return nil, fmt.Errorf("folderID not set")
+       }
+       if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil {
+               return nil, err
+       }
+       if err := json.Unmarshal(data, &res); err != nil {
+               return nil, err
+       }
+       return &res, nil
+}
+
+// IsFolderInSync Returns true when folder is in sync
+func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) {
+       // FIXME better to detected FolderCompletion event (/rest/events)
+       // See https://docs.syncthing.net/dev/events.html
+       sts, err := s.FolderStatus(folderID)
+       if err != nil {
+               return false, err
+       }
+       return sts.NeedBytes == 0, nil
+}
+
+// FolderScan Request immediate folder scan.
+// Scan all folders if folderID param is empty
+func (s *SyncThing) FolderScan(folderID string, subpath string) error {
+       url := "db/scan"
+       if folderID != "" {
+               url += "?folder=" + folderID
+
+               if subpath != "" {
+                       url += "&sub=" + subpath
+               }
+       }
+       return s.client.HTTPPost(url, "")
+}