From: Sebastien Douheret Date: Sat, 27 May 2017 21:10:33 +0000 (+0200) Subject: Wait folder insync before sending exit event. X-Git-Tag: v0.0.1-alpha~8 X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?p=src%2Fxds%2Fxds-server.git;a=commitdiff_plain;h=330cffa06c3efea42a42ca8e908b8b24db6bba3f Wait folder insync before sending exit event. By default wait folder insync. Set ExitImmediate param to true to send exit event immedialty without waiting folder synchronization. --- diff --git a/lib/apiv1/exec.go b/lib/apiv1/exec.go index 895807d..675f6fb 100644 --- a/lib/apiv1/exec.go +++ b/lib/apiv1/exec.go @@ -12,13 +12,14 @@ import ( // ExecArgs JSON parameters of /exec command type ExecArgs struct { - ID string `json:"id" binding:"required"` - SdkID string `json:"sdkid"` // sdk ID to use for setting env - Cmd string `json:"cmd" binding:"required"` - Args []string `json:"args"` - Env []string `json:"env"` - RPath string `json:"rpath"` // relative path into project - CmdTimeout int `json:"timeout"` // command completion timeout in Second + ID string `json:"id" binding:"required"` + SdkID string `json:"sdkid"` // sdk ID to use for setting env + Cmd string `json:"cmd" binding:"required"` + Args []string `json:"args"` + Env []string `json:"env"` + RPath string `json:"rpath"` // relative path into project + ExitImmediate bool `json:"exitImmediate"` // when true, exit event sent immediately when command exited (IOW, don't wait file synchronization) + CmdTimeout int `json:"timeout"` // command completion timeout in Second } // ExecOutMsg Message send on each output (stdout+stderr) of executed command @@ -122,7 +123,7 @@ func (s *APIService) execCmd(c *gin.Context) { } // Define callback for output - eCB := func(sid string, id int, code int, err error) { + eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) { s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err) // IO socket can be nil when disconnected @@ -132,6 +133,30 @@ func (s *APIService) execCmd(c *gin.Context) { return } + // Retrieve project ID and RootPath + prjID := (*data)["ID"].(string) + exitImm := (*data)["ExitImmediate"].(bool) + + // XXX - workaround to be sure that Syncthing detected all changes + if err := s.mfolder.ForceSync(prjID); err != nil { + s.log.Errorf("Error while syncing folder %s: %v", prjID, err) + } + if !exitImm { + // Wait end of file sync + // FIXME pass as argument + tmo := 60 + for t := tmo; t > 0; t-- { + s.log.Debugf("Wait file insync for %s (%d/%d)", prjID, t, tmo) + if sync, err := s.mfolder.IsFolderInSync(prjID); sync || err != nil { + if err != nil { + s.log.Errorf("ERROR IsFolderInSync (%s): %v", prjID, err) + } + break + } + time.Sleep(time.Second) + } + } + // FIXME replace by .BroadcastTo a room e := (*so).Emit(ExecExitEvent, ExecExitMsg{ CmdID: strconv.Itoa(id), @@ -164,6 +189,7 @@ func (s *APIService) execCmd(c *gin.Context) { data := make(map[string]interface{}) data["ID"] = prj.ID data["RootPath"] = prj.RootPath + data["ExitImmediate"] = args.ExitImmediate err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, &data) if err != nil { diff --git a/lib/apiv1/make.go b/lib/apiv1/make.go index 098e41c..d015d2b 100644 --- a/lib/apiv1/make.go +++ b/lib/apiv1/make.go @@ -2,6 +2,7 @@ package apiv1 import ( "net/http" + "strings" "time" @@ -95,7 +96,16 @@ func (s *APIService) buildMake(c *gin.Context) { s.log.Infof("%s not emitted: WS closed - sid: %s - msg id:%d", MakeOutEvent, sid, id) return } - s.log.Debugf("%s emitted - WS sid %s - id:%d", MakeOutEvent, sid, id) + + // Retrieve project ID and RootPath + prjID := (*data)["ID"].(string) + prjRootPath := (*data)["RootPath"].(string) + + // Cleanup any references to internal rootpath in stdout & stderr + stdout = strings.Replace(stdout, prjRootPath, "", -1) + stderr = strings.Replace(stderr, prjRootPath, "", -1) + + s.log.Debugf("%s emitted - WS sid %s - id:%d - prjID:%s", MakeOutEvent, sid, id, prjID) // FIXME replace by .BroadcastTo a room err := (*so).Emit(MakeOutEvent, MakeOutMsg{ @@ -110,7 +120,7 @@ func (s *APIService) buildMake(c *gin.Context) { } // Define callback for output - eCB := func(sid string, id int, code int, err error) { + eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) { s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err) // IO socket can be nil when disconnected @@ -120,6 +130,14 @@ func (s *APIService) buildMake(c *gin.Context) { return } + // Retrieve project ID and RootPath + prjID := (*data)["ID"].(string) + + // XXX - workaround to be sure that Syncthing detected all changes + if err := s.mfolder.ForceSync(prjID); err != nil { + s.log.Errorf("Error while syncing folder %s: %v", prjID, err) + } + // FIXME replace by .BroadcastTo a room e := (*so).Emit(MakeExitEvent, MakeExitMsg{ CmdID: strconv.Itoa(id), @@ -148,6 +166,11 @@ func (s *APIService) buildMake(c *gin.Context) { } s.log.Debugf("Execute [Cmd ID %d]: %v", cmdID, cmd) + + data := make(map[string]interface{}) + data["ID"] = prj.ID + data["RootPath"] = prj.RootPath + err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, nil) if err != nil { common.APIError(c, err.Error()) diff --git a/lib/common/execPipeWs.go b/lib/common/execPipeWs.go index 4994d9d..9bb4517 100644 --- a/lib/common/execPipeWs.go +++ b/lib/common/execPipeWs.go @@ -18,7 +18,7 @@ import ( type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{}) // EmitExitCB is the function callback used to emit exit proc code -type EmitExitCB func(sid string, cmdID int, code int, err error) +type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{}) // Inspired by : // https://github.com/gorilla/websocket/blob/master/examples/command/main.go @@ -63,7 +63,7 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data) // Blocking function that poll input or wait for end of process - cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB) + cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data) // Some commands will exit when stdin is closed. inw.Close() @@ -94,7 +94,8 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd } func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process, - sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) { + sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB, + data *map[string]interface{}) { /* XXX - code to add to support stdin through WS for { _, message, err := so. ?? ReadMessage() @@ -127,10 +128,10 @@ func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process, // Wait cmd complete select { case dC := <-done: - exitFuncCB(sid, cmdID, dC.status, dC.err) + exitFuncCB(sid, cmdID, dC.status, dC.err, data) case <-time.After(time.Duration(tmo) * time.Second): exitFuncCB(sid, cmdID, -99, - fmt.Errorf("Exit Timeout for command ID %v", cmdID)) + fmt.Errorf("Exit Timeout for command ID %v", cmdID), data) } } diff --git a/lib/model/folder.go b/lib/model/folder.go index e461f9c..fa94409 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -98,3 +98,13 @@ func (c *Folder) DeleteFolder(id string) (xdsconfig.FolderConfig, error) { return fld, err } + +// ForceSync Force the synchronization of a folder +func (c *Folder) ForceSync(id string) error { + return c.SThg.FolderScan(id, "") +} + +// IsFolderInSync Returns true when folder is in sync +func (c *Folder) IsFolderInSync(id string) (bool, error) { + return c.SThg.IsFolderInSync(id) +} diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index 957dd65..75bdf80 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -49,6 +49,42 @@ type ExitChan struct { err error } +// ConfigInSync Check whether if Syncthing configuration is in sync +type configInSync struct { + ConfigInSync bool `json:"configInSync"` +} + +// FolderStatus Information about the current status of a folder. +type FolderStatus struct { + GlobalFiles int `json:"globalFiles"` + GlobalDirectories int `json:"globalDirectories"` + GlobalSymlinks int `json:"globalSymlinks"` + GlobalDeleted int `json:"globalDeleted"` + GlobalBytes int64 `json:"globalBytes"` + + LocalFiles int `json:"localFiles"` + LocalDirectories int `json:"localDirectories"` + LocalSymlinks int `json:"localSymlinks"` + LocalDeleted int `json:"localDeleted"` + LocalBytes int64 `json:"localBytes"` + + NeedFiles int `json:"needFiles"` + NeedDirectories int `json:"needDirectories"` + NeedSymlinks int `json:"needSymlinks"` + NeedDeletes int `json:"needDeletes"` + NeedBytes int64 `json:"needBytes"` + + InSyncFiles int `json:"inSyncFiles"` + InSyncBytes int64 `json:"inSyncBytes"` + + State string `json:"state"` + StateChanged time.Time `json:"stateChanged"` + + Sequence int64 `json:"sequence"` + + IgnorePatterns bool `json:"ignorePatterns"` +} + // NewSyncThing creates a new instance of Syncthing func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { var url, apiKey, home, binDir string @@ -309,3 +345,57 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error { } return s.client.HTTPPost("system/config", string(body)) } + +// IsConfigInSync Returns true if configuration is in sync +func (s *SyncThing) IsConfigInSync() (bool, error) { + var data []byte + var d configInSync + if err := s.client.HTTPGet("system/config/insync", &data); err != nil { + return false, err + } + if err := json.Unmarshal(data, &d); err != nil { + return false, err + } + return d.ConfigInSync, nil +} + +// FolderStatus Returns all information about the current +func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) { + var data []byte + var res FolderStatus + if folderID == "" { + return nil, fmt.Errorf("folderID not set") + } + if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil { + return nil, err + } + if err := json.Unmarshal(data, &res); err != nil { + return nil, err + } + return &res, nil +} + +// IsFolderInSync Returns true when folder is in sync +func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) { + // FIXME better to detected FolderCompletion event (/rest/events) + // See https://docs.syncthing.net/dev/events.html + sts, err := s.FolderStatus(folderID) + if err != nil { + return false, err + } + return sts.NeedBytes == 0, nil +} + +// FolderScan Request immediate folder scan. +// Scan all folders if folderID param is empty +func (s *SyncThing) FolderScan(folderID string, subpath string) error { + url := "db/scan" + if folderID != "" { + url += "?folder=" + folderID + + if subpath != "" { + url += "&sub=" + subpath + } + } + return s.client.HTTPPost(url, "") +}