From: Sebastien Douheret Date: Wed, 8 Nov 2017 09:19:54 +0000 (+0100) Subject: Fixed events definition and callback processing. X-Git-Tag: v1.0.0-rc1~22 X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?p=src%2Fxds%2Fxds-agent.git;a=commitdiff_plain;h=02aec942b44eecd2ea9b311bb4ba2d60cce21e9a Fixed events definition and callback processing. --- diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go index 9c65bc2..c199267 100644 --- a/lib/agent/apiv1-exec.go +++ b/lib/agent/apiv1-exec.go @@ -12,6 +12,7 @@ import ( ) var execCmdID = 1 +var fwdFuncID []uuid.UUID // ExecCmd executes remotely a command func (s *APIService) execCmd(c *gin.Context) { @@ -38,11 +39,15 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { } // First get Project ID to retrieve Server ID and send command to right server - id := c.Param("id") - if id == "" { - id = args.ID + iid := c.Param("id") + if iid == "" { + iid = args.ID + } + id, err := s.projects.ResolveID(iid) + if err != nil { + common.APIError(c, err.Error()) + return } - prj := s.projects.Get(id) if prj == nil { common.APIError(c, "Unknown id") @@ -75,15 +80,23 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { apiv1.ExecInferiorInEvent, apiv1.ExecInferiorOutEvent, } - so := *sock - fwdFuncID := []uuid.UUID{} + for _, evName := range evtList { evN := evName - fwdFunc := func(evData interface{}) { + fwdFunc := func(pData interface{}, evData interface{}) error { + sid := pData.(string) + // IO socket can be nil when disconnected + so := s.sessions.IOSocketGet(sid) + if so == nil { + s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid) + return nil + } + // Forward event to Client/Dashboard - so.Emit(evN, evData) + (*so).Emit(evN, evData) + return nil } - id, err := svr.EventOn(evN, fwdFunc) + id, err := svr.EventOn(evN, sess.ID, fwdFunc) if err != nil { common.APIError(c, err.Error()) return @@ -93,16 +106,28 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { // Handle Exit event separately to cleanup registered listener var exitFuncID uuid.UUID - exitFunc := func(evData interface{}) { - so.Emit(apiv1.ExecExitEvent, evData) + exitFunc := func(pData interface{}, evData interface{}) error { + evN := apiv1.ExecExitEvent + sid := pData.(string) + + // IO socket can be nil when disconnected + so := s.sessions.IOSocketGet(sid) + if so == nil { + s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid) + return nil + } + + (*so).Emit(evN, evData) // cleanup listener for i, evName := range evtList { svr.EventOff(evName, fwdFuncID[i]) } - svr.EventOff(apiv1.ExecExitEvent, exitFuncID) + svr.EventOff(evN, exitFuncID) + + return nil } - exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, exitFunc) + exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, sess.ID, exitFunc) if err != nil { common.APIError(c, err.Error()) return diff --git a/lib/agent/events.go b/lib/agent/events.go index 046c377..2684ff5 100644 --- a/lib/agent/events.go +++ b/lib/agent/events.go @@ -104,8 +104,9 @@ func (e *Events) Emit(evName string, data interface{}) error { Type: evName, Data: data, } - if err := (*so).Emit(apiv1.EventTypePrefix+evName, msg); err != nil { - e.Log.Errorf("WS Emit %v error : %v", apiv1.EventTypePrefix+evName, err) + e.Log.Debugf("Emit Event %s: %v", evName, sid) + if err := (*so).Emit(evName, msg); err != nil { + e.Log.Errorf("WS Emit %v error : %v", evName, err) if firstErr == nil { firstErr = err } diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go index dba5978..cd55656 100644 --- a/lib/agent/project-st.go +++ b/lib/agent/project-st.go @@ -85,7 +85,7 @@ func (p *STProject) UpdateProject(prj apiv1.ProjectConfig) (*apiv1.ProjectConfig // Register events to update folder status // Register to XDS Server events - p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged) + p.server.EventOn("event:FolderStateChanged", "", p._cbServerFolderChanged) if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil { p.Log.Warningf("XDS Server EventRegister failed: %v", err) return svrPrj, err @@ -128,12 +128,12 @@ func (p *STProject) IsInSync() (bool, error) { // callback use to update (XDS Server) folder IsInSync status -func (p *STProject) _cbServerFolderChanged(data interface{}) { +func (p *STProject) _cbServerFolderChanged(pData interface{}, data interface{}) error { evt := data.(XdsEventFolderChange) // Only process event that concerns this project/folder ID if p.folder.ID != evt.Folder.ID { - return + return nil } if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || @@ -146,6 +146,7 @@ func (p *STProject) _cbServerFolderChanged(data interface{}) { p.Log.Warningf("Cannot notify project change: %v", err) } } + return nil } // callback use to update IsInSync status diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 2037094..73a5bd9 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -112,11 +112,15 @@ type XdsEventFolderChange struct { Folder XdsFolderConfig `json:"folder"` } +// Event emitter callback +type EventCB func(privData interface{}, evtData interface{}) error + // caller Used to chain event listeners type caller struct { - id uuid.UUID - EventName string - Func func(interface{}) + id uuid.UUID + EventName string + Func EventCB + PrivateData interface{} } const _IDTempoPrefix = "tempo-" @@ -316,7 +320,7 @@ func (xs *XdsServer) EventRegister(evName string, id string) error { } // EventOn Register a callback on events reception -func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { +func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) { if xs.ioSock == nil { return uuid.Nil, fmt.Errorf("Io.Socket not initialized") } @@ -333,14 +337,25 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err if evName == "event:FolderStateChanged" { err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { xs.sockEventsLock.Lock() - defer xs.sockEventsLock.Unlock() - for _, c := range xs.sockEvents[evn] { - c.Func(data) + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) } return nil }) } else { - err = xs.ioSock.On(evn, f) + err = xs.ioSock.On(evn, func(data interface{}) error { + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) } if err != nil { return uuid.Nil, err @@ -348,9 +363,10 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err } c := &caller{ - id: uuid.NewV1(), - EventName: evName, - Func: f, + id: uuid.NewV1(), + EventName: evName, + Func: f, + PrivateData: privData, } xs.sockEvents[evName] = append(xs.sockEvents[evName], c) diff --git a/lib/apiv1/events.go b/lib/apiv1/events.go index 8bad394..da9a2af 100644 --- a/lib/apiv1/events.go +++ b/lib/apiv1/events.go @@ -18,11 +18,11 @@ const ( EventTypePrefix = "event:" // following by event type // Supported Events type - EVTAll = "all" - EVTServerConfig = "server-config" // data type apiv1.ServerCfg - EVTProjectAdd = "project-add" // data type apiv1.ProjectConfig - EVTProjectDelete = "project-delete" // data type apiv1.ProjectConfig - EVTProjectChange = "project-state-change" // data type apiv1.ProjectConfig + EVTAll = EventTypePrefix + "all" + EVTServerConfig = EventTypePrefix + "server-config" // data type apiv1.ServerCfg + EVTProjectAdd = EventTypePrefix + "project-add" // data type apiv1.ProjectConfig + EVTProjectDelete = EventTypePrefix + "project-delete" // data type apiv1.ProjectConfig + EVTProjectChange = EventTypePrefix + "project-state-change" // data type apiv1.ProjectConfig ) // EventMsg Message send