Fixed events definition and callback processing.
authorSebastien Douheret <sebastien.douheret@iot.bzh>
Wed, 8 Nov 2017 09:19:54 +0000 (10:19 +0100)
committerSebastien Douheret <sebastien.douheret@iot.bzh>
Wed, 8 Nov 2017 09:19:54 +0000 (10:19 +0100)
lib/agent/apiv1-exec.go
lib/agent/events.go
lib/agent/project-st.go
lib/agent/xdsserver.go
lib/apiv1/events.go

index 9c65bc2..c199267 100644 (file)
@@ -12,6 +12,7 @@ import (
 )
 
 var execCmdID = 1
+var fwdFuncID []uuid.UUID
 
 // ExecCmd executes remotely a command
 func (s *APIService) execCmd(c *gin.Context) {
@@ -38,11 +39,15 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
        }
 
        // First get Project ID to retrieve Server ID and send command to right server
-       id := c.Param("id")
-       if id == "" {
-               id = args.ID
+       iid := c.Param("id")
+       if iid == "" {
+               iid = args.ID
+       }
+       id, err := s.projects.ResolveID(iid)
+       if err != nil {
+               common.APIError(c, err.Error())
+               return
        }
-
        prj := s.projects.Get(id)
        if prj == nil {
                common.APIError(c, "Unknown id")
@@ -75,15 +80,23 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
                apiv1.ExecInferiorInEvent,
                apiv1.ExecInferiorOutEvent,
        }
-       so := *sock
-       fwdFuncID := []uuid.UUID{}
+
        for _, evName := range evtList {
                evN := evName
-               fwdFunc := func(evData interface{}) {
+               fwdFunc := func(pData interface{}, evData interface{}) error {
+                       sid := pData.(string)
+                       // IO socket can be nil when disconnected
+                       so := s.sessions.IOSocketGet(sid)
+                       if so == nil {
+                               s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+                               return nil
+                       }
+
                        // Forward event to Client/Dashboard
-                       so.Emit(evN, evData)
+                       (*so).Emit(evN, evData)
+                       return nil
                }
-               id, err := svr.EventOn(evN, fwdFunc)
+               id, err := svr.EventOn(evN, sess.ID, fwdFunc)
                if err != nil {
                        common.APIError(c, err.Error())
                        return
@@ -93,16 +106,28 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
 
        // Handle Exit event separately to cleanup registered listener
        var exitFuncID uuid.UUID
-       exitFunc := func(evData interface{}) {
-               so.Emit(apiv1.ExecExitEvent, evData)
+       exitFunc := func(pData interface{}, evData interface{}) error {
+               evN := apiv1.ExecExitEvent
+               sid := pData.(string)
+
+               // IO socket can be nil when disconnected
+               so := s.sessions.IOSocketGet(sid)
+               if so == nil {
+                       s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+                       return nil
+               }
+
+               (*so).Emit(evN, evData)
 
                // cleanup listener
                for i, evName := range evtList {
                        svr.EventOff(evName, fwdFuncID[i])
                }
-               svr.EventOff(apiv1.ExecExitEvent, exitFuncID)
+               svr.EventOff(evN, exitFuncID)
+
+               return nil
        }
-       exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, exitFunc)
+       exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, sess.ID, exitFunc)
        if err != nil {
                common.APIError(c, err.Error())
                return
index 046c377..2684ff5 100644 (file)
@@ -104,8 +104,9 @@ func (e *Events) Emit(evName string, data interface{}) error {
                        Type: evName,
                        Data: data,
                }
-               if err := (*so).Emit(apiv1.EventTypePrefix+evName, msg); err != nil {
-                       e.Log.Errorf("WS Emit %v error : %v", apiv1.EventTypePrefix+evName, err)
+               e.Log.Debugf("Emit Event %s: %v", evName, sid)
+               if err := (*so).Emit(evName, msg); err != nil {
+                       e.Log.Errorf("WS Emit %v error : %v", evName, err)
                        if firstErr == nil {
                                firstErr = err
                        }
index dba5978..cd55656 100644 (file)
@@ -85,7 +85,7 @@ func (p *STProject) UpdateProject(prj apiv1.ProjectConfig) (*apiv1.ProjectConfig
 
        // Register events to update folder status
        // Register to XDS Server events
-       p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged)
+       p.server.EventOn("event:FolderStateChanged", "", p._cbServerFolderChanged)
        if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil {
                p.Log.Warningf("XDS Server EventRegister failed: %v", err)
                return svrPrj, err
@@ -128,12 +128,12 @@ func (p *STProject) IsInSync() (bool, error) {
 
 // callback use to update (XDS Server) folder IsInSync status
 
-func (p *STProject) _cbServerFolderChanged(data interface{}) {
+func (p *STProject) _cbServerFolderChanged(pData interface{}, data interface{}) error {
        evt := data.(XdsEventFolderChange)
 
        // Only process event that concerns this project/folder ID
        if p.folder.ID != evt.Folder.ID {
-               return
+               return nil
        }
 
        if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync ||
@@ -146,6 +146,7 @@ func (p *STProject) _cbServerFolderChanged(data interface{}) {
                        p.Log.Warningf("Cannot notify project change: %v", err)
                }
        }
+       return nil
 }
 
 // callback use to update IsInSync status
index 2037094..73a5bd9 100644 (file)
@@ -112,11 +112,15 @@ type XdsEventFolderChange struct {
        Folder XdsFolderConfig `json:"folder"`
 }
 
+// Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
+
 // caller Used to chain event listeners
 type caller struct {
-       id        uuid.UUID
-       EventName string
-       Func      func(interface{})
+       id          uuid.UUID
+       EventName   string
+       Func        EventCB
+       PrivateData interface{}
 }
 
 const _IDTempoPrefix = "tempo-"
@@ -316,7 +320,7 @@ func (xs *XdsServer) EventRegister(evName string, id string) error {
 }
 
 // EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
        if xs.ioSock == nil {
                return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
        }
@@ -333,14 +337,25 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
                if evName == "event:FolderStateChanged" {
                        err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
                                xs.sockEventsLock.Lock()
-                               defer xs.sockEventsLock.Unlock()
-                               for _, c := range xs.sockEvents[evn] {
-                                       c.Func(data)
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
                                }
                                return nil
                        })
                } else {
-                       err = xs.ioSock.On(evn, f)
+                       err = xs.ioSock.On(evn, func(data interface{}) error {
+                               xs.sockEventsLock.Lock()
+                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
+                               copy(sEvts, xs.sockEvents[evn])
+                               xs.sockEventsLock.Unlock()
+                               for _, c := range sEvts {
+                                       c.Func(c.PrivateData, data)
+                               }
+                               return nil
+                       })
                }
                if err != nil {
                        return uuid.Nil, err
@@ -348,9 +363,10 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
        }
 
        c := &caller{
-               id:        uuid.NewV1(),
-               EventName: evName,
-               Func:      f,
+               id:          uuid.NewV1(),
+               EventName:   evName,
+               Func:        f,
+               PrivateData: privData,
        }
 
        xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
index 8bad394..da9a2af 100644 (file)
@@ -18,11 +18,11 @@ const (
        EventTypePrefix = "event:" // following by event type
 
        // Supported Events type
-       EVTAll           = "all"
-       EVTServerConfig  = "server-config"        // data type apiv1.ServerCfg
-       EVTProjectAdd    = "project-add"          // data type apiv1.ProjectConfig
-       EVTProjectDelete = "project-delete"       // data type apiv1.ProjectConfig
-       EVTProjectChange = "project-state-change" // data type apiv1.ProjectConfig
+       EVTAll           = EventTypePrefix + "all"
+       EVTServerConfig  = EventTypePrefix + "server-config"        // data type apiv1.ServerCfg
+       EVTProjectAdd    = EventTypePrefix + "project-add"          // data type apiv1.ProjectConfig
+       EVTProjectDelete = EventTypePrefix + "project-delete"       // data type apiv1.ProjectConfig
+       EVTProjectChange = EventTypePrefix + "project-state-change" // data type apiv1.ProjectConfig
 )
 
 // EventMsg Message send