Code Review
/
src
/
xds
/
xds-agent.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
review
|
tree
raw
|
patch
|
inline
| side by side (parent:
d96e5b5
)
Fixed events definition and callback processing.
author
Sebastien Douheret
<sebastien.douheret@iot.bzh>
Wed, 8 Nov 2017 09:19:54 +0000
(10:19 +0100)
committer
Sebastien Douheret
<sebastien.douheret@iot.bzh>
Wed, 8 Nov 2017 09:19:54 +0000
(10:19 +0100)
lib/agent/apiv1-exec.go
patch
|
blob
|
history
lib/agent/events.go
patch
|
blob
|
history
lib/agent/project-st.go
patch
|
blob
|
history
lib/agent/xdsserver.go
patch
|
blob
|
history
lib/apiv1/events.go
patch
|
blob
|
history
diff --git
a/lib/agent/apiv1-exec.go
b/lib/agent/apiv1-exec.go
index
9c65bc2
..
c199267
100644
(file)
--- a/
lib/agent/apiv1-exec.go
+++ b/
lib/agent/apiv1-exec.go
@@
-12,6
+12,7
@@
import (
)
var execCmdID = 1
)
var execCmdID = 1
+var fwdFuncID []uuid.UUID
// ExecCmd executes remotely a command
func (s *APIService) execCmd(c *gin.Context) {
// ExecCmd executes remotely a command
func (s *APIService) execCmd(c *gin.Context) {
@@
-38,11
+39,15
@@
func (s *APIService) _execRequest(cmd string, c *gin.Context) {
}
// First get Project ID to retrieve Server ID and send command to right server
}
// First get Project ID to retrieve Server ID and send command to right server
- id := c.Param("id")
- if id == "" {
- id = args.ID
+ iid := c.Param("id")
+ if iid == "" {
+ iid = args.ID
+ }
+ id, err := s.projects.ResolveID(iid)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
}
}
-
prj := s.projects.Get(id)
if prj == nil {
common.APIError(c, "Unknown id")
prj := s.projects.Get(id)
if prj == nil {
common.APIError(c, "Unknown id")
@@
-75,15
+80,23
@@
func (s *APIService) _execRequest(cmd string, c *gin.Context) {
apiv1.ExecInferiorInEvent,
apiv1.ExecInferiorOutEvent,
}
apiv1.ExecInferiorInEvent,
apiv1.ExecInferiorOutEvent,
}
- so := *sock
- fwdFuncID := []uuid.UUID{}
+
for _, evName := range evtList {
evN := evName
for _, evName := range evtList {
evN := evName
- fwdFunc := func(evData interface{}) {
+ fwdFunc := func(pData interface{}, evData interface{}) error {
+ sid := pData.(string)
+ // IO socket can be nil when disconnected
+ so := s.sessions.IOSocketGet(sid)
+ if so == nil {
+ s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+ return nil
+ }
+
// Forward event to Client/Dashboard
// Forward event to Client/Dashboard
- so.Emit(evN, evData)
+ (*so).Emit(evN, evData)
+ return nil
}
}
- id, err := svr.EventOn(evN, fwdFunc)
+ id, err := svr.EventOn(evN,
sess.ID,
fwdFunc)
if err != nil {
common.APIError(c, err.Error())
return
if err != nil {
common.APIError(c, err.Error())
return
@@
-93,16
+106,28
@@
func (s *APIService) _execRequest(cmd string, c *gin.Context) {
// Handle Exit event separately to cleanup registered listener
var exitFuncID uuid.UUID
// Handle Exit event separately to cleanup registered listener
var exitFuncID uuid.UUID
- exitFunc := func(evData interface{}) {
- so.Emit(apiv1.ExecExitEvent, evData)
+ exitFunc := func(pData interface{}, evData interface{}) error {
+ evN := apiv1.ExecExitEvent
+ sid := pData.(string)
+
+ // IO socket can be nil when disconnected
+ so := s.sessions.IOSocketGet(sid)
+ if so == nil {
+ s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+ return nil
+ }
+
+ (*so).Emit(evN, evData)
// cleanup listener
for i, evName := range evtList {
svr.EventOff(evName, fwdFuncID[i])
}
// cleanup listener
for i, evName := range evtList {
svr.EventOff(evName, fwdFuncID[i])
}
- svr.EventOff(apiv1.ExecExitEvent, exitFuncID)
+ svr.EventOff(evN, exitFuncID)
+
+ return nil
}
}
- exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, exitFunc)
+ exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent,
sess.ID,
exitFunc)
if err != nil {
common.APIError(c, err.Error())
return
if err != nil {
common.APIError(c, err.Error())
return
diff --git
a/lib/agent/events.go
b/lib/agent/events.go
index
046c377
..
2684ff5
100644
(file)
--- a/
lib/agent/events.go
+++ b/
lib/agent/events.go
@@
-104,8
+104,9
@@
func (e *Events) Emit(evName string, data interface{}) error {
Type: evName,
Data: data,
}
Type: evName,
Data: data,
}
- if err := (*so).Emit(apiv1.EventTypePrefix+evName, msg); err != nil {
- e.Log.Errorf("WS Emit %v error : %v", apiv1.EventTypePrefix+evName, err)
+ e.Log.Debugf("Emit Event %s: %v", evName, sid)
+ if err := (*so).Emit(evName, msg); err != nil {
+ e.Log.Errorf("WS Emit %v error : %v", evName, err)
if firstErr == nil {
firstErr = err
}
if firstErr == nil {
firstErr = err
}
diff --git
a/lib/agent/project-st.go
b/lib/agent/project-st.go
index
dba5978
..
cd55656
100644
(file)
--- a/
lib/agent/project-st.go
+++ b/
lib/agent/project-st.go
@@
-85,7
+85,7
@@
func (p *STProject) UpdateProject(prj apiv1.ProjectConfig) (*apiv1.ProjectConfig
// Register events to update folder status
// Register to XDS Server events
// Register events to update folder status
// Register to XDS Server events
- p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged)
+ p.server.EventOn("event:FolderStateChanged",
"",
p._cbServerFolderChanged)
if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil {
p.Log.Warningf("XDS Server EventRegister failed: %v", err)
return svrPrj, err
if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil {
p.Log.Warningf("XDS Server EventRegister failed: %v", err)
return svrPrj, err
@@
-128,12
+128,12
@@
func (p *STProject) IsInSync() (bool, error) {
// callback use to update (XDS Server) folder IsInSync status
// callback use to update (XDS Server) folder IsInSync status
-func (p *STProject) _cbServerFolderChanged(
data interface{})
{
+func (p *STProject) _cbServerFolderChanged(
pData interface{}, data interface{}) error
{
evt := data.(XdsEventFolderChange)
// Only process event that concerns this project/folder ID
if p.folder.ID != evt.Folder.ID {
evt := data.(XdsEventFolderChange)
// Only process event that concerns this project/folder ID
if p.folder.ID != evt.Folder.ID {
- return
+ return
nil
}
if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync ||
}
if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync ||
@@
-146,6
+146,7
@@
func (p *STProject) _cbServerFolderChanged(data interface{}) {
p.Log.Warningf("Cannot notify project change: %v", err)
}
}
p.Log.Warningf("Cannot notify project change: %v", err)
}
}
+ return nil
}
// callback use to update IsInSync status
}
// callback use to update IsInSync status
diff --git
a/lib/agent/xdsserver.go
b/lib/agent/xdsserver.go
index
2037094
..
73a5bd9
100644
(file)
--- a/
lib/agent/xdsserver.go
+++ b/
lib/agent/xdsserver.go
@@
-112,11
+112,15
@@
type XdsEventFolderChange struct {
Folder XdsFolderConfig `json:"folder"`
}
Folder XdsFolderConfig `json:"folder"`
}
+// Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
+
// caller Used to chain event listeners
type caller struct {
// caller Used to chain event listeners
type caller struct {
- id uuid.UUID
- EventName string
- Func func(interface{})
+ id uuid.UUID
+ EventName string
+ Func EventCB
+ PrivateData interface{}
}
const _IDTempoPrefix = "tempo-"
}
const _IDTempoPrefix = "tempo-"
@@
-316,7
+320,7
@@
func (xs *XdsServer) EventRegister(evName string, id string) error {
}
// EventOn Register a callback on events reception
}
// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(evName string,
f func(interface{})
) (uuid.UUID, error) {
+func (xs *XdsServer) EventOn(evName string,
privData interface{}, f EventCB
) (uuid.UUID, error) {
if xs.ioSock == nil {
return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
if xs.ioSock == nil {
return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
@@
-333,14
+337,25
@@
func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
if evName == "event:FolderStateChanged" {
err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
xs.sockEventsLock.Lock()
if evName == "event:FolderStateChanged" {
err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
xs.sockEventsLock.Lock()
- defer xs.sockEventsLock.Unlock()
- for _, c := range xs.sockEvents[evn] {
- c.Func(data)
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
}
return nil
})
} else {
}
return nil
})
} else {
- err = xs.ioSock.On(evn, f)
+ err = xs.ioSock.On(evn, func(data interface{}) error {
+ xs.sockEventsLock.Lock()
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
+ }
+ return nil
+ })
}
if err != nil {
return uuid.Nil, err
}
if err != nil {
return uuid.Nil, err
@@
-348,9
+363,10
@@
func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
}
c := &caller{
}
c := &caller{
- id: uuid.NewV1(),
- EventName: evName,
- Func: f,
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ PrivateData: privData,
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
diff --git
a/lib/apiv1/events.go
b/lib/apiv1/events.go
index
8bad394
..
da9a2af
100644
(file)
--- a/
lib/apiv1/events.go
+++ b/
lib/apiv1/events.go
@@
-18,11
+18,11
@@
const (
EventTypePrefix = "event:" // following by event type
// Supported Events type
EventTypePrefix = "event:" // following by event type
// Supported Events type
- EVTAll = "all"
- EVTServerConfig = "server-config" // data type apiv1.ServerCfg
- EVTProjectAdd = "project-add" // data type apiv1.ProjectConfig
- EVTProjectDelete = "project-delete" // data type apiv1.ProjectConfig
- EVTProjectChange = "project-state-change" // data type apiv1.ProjectConfig
+ EVTAll =
EventTypePrefix +
"all"
+ EVTServerConfig =
EventTypePrefix +
"server-config" // data type apiv1.ServerCfg
+ EVTProjectAdd =
EventTypePrefix +
"project-add" // data type apiv1.ProjectConfig
+ EVTProjectDelete =
EventTypePrefix +
"project-delete" // data type apiv1.ProjectConfig
+ EVTProjectChange =
EventTypePrefix +
"project-state-change" // data type apiv1.ProjectConfig
)
// EventMsg Message send
)
// EventMsg Message send