+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package agent
import (
ioSock *sio_client.Client
logOut io.Writer
apiRouter *gin.RouterGroup
+ cmdList map[string]interface{}
}
// EventCB Event emitter callback
sockEvents: make(map[string][]*caller),
sockEventsLock: &sync.Mutex{},
logOut: ctx.Log.Out,
+ cmdList: make(map[string]interface{}),
}
}
// Close Free and close XDS Server connection
func (xs *XdsServer) Close() error {
- xs.Connected = false
+ err := xs._Disconnected()
xs.Disabled = true
- xs.ioSock = nil
- xs._NotifyState()
- return nil
+ return err
}
// Connect Establish HTTP connection with XDS Server
time.Sleep(time.Second)
}
if retry == 0 {
- // FIXME: re-use _reconnect to wait longer in background
+ // FIXME: re-use _Reconnect to wait longer in background
return fmt.Errorf("Connection to XDS Server failure")
}
if err != nil {
}
// Check HTTP connection and establish WS connection
- err = xs._connect(false)
+ err = xs._Connect(false)
return err
}
return xs.client.Put("/folders/"+fld.ID, fld, resFld)
}
+// CommandExec Send POST request to execute a command
+func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
+ return xs.client.Post("/exec", args, res)
+}
+
+// CommandSignal Send POST request to send a signal to a command
+func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
+ return xs.client.Post("/signal", args, res)
+}
+
// SetAPIRouterGroup .
func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
xs.apiRouter = r
// Send Get request
if err := xs.client.Get(nURL, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
}
common.APIError(c, err.Error())
return
nil)
}
+// EventEmit Emit a event to XDS Server through WS
+func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
+ if xs.ioSock == nil {
+ return fmt.Errorf("Io.Socket not initialized")
+ }
+
+ return xs.ioSock.Emit(message, args...)
+}
+
// EventOn Register a callback on events reception
func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
if xs.ioSock == nil {
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+ xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
return c.id, nil
}
}
}
}
+ xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
return nil
}
return pPrj
}
+// CommandAdd Add a new command to the list of running commands
+func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
+ if xs.CommandGet(cmdID) != nil {
+ return fmt.Errorf("command id already exist")
+ }
+ xs.cmdList[cmdID] = data
+ return nil
+}
+
+// CommandDelete Delete a command from the command list
+func (xs *XdsServer) CommandDelete(cmdID string) error {
+ if xs.CommandGet(cmdID) == nil {
+ return fmt.Errorf("unknown command id")
+ }
+ delete(xs.cmdList, cmdID)
+ return nil
+}
+
+// CommandGet Retrieve a command data
+func (xs *XdsServer) CommandGet(cmdID string) interface{} {
+ d, exist := xs.cmdList[cmdID]
+ if exist {
+ return d
+ }
+ return nil
+}
+
/***
** Private functions
***/
return nil
}
-// Re-established connection
-func (xs *XdsServer) _reconnect() error {
- err := xs._connect(true)
+// _Reconnect Re-established connection
+func (xs *XdsServer) _Reconnect() error {
+ err := xs._Connect(true)
if err == nil {
// Reload projects list for this server
err = xs.projects.Init(xs)
return err
}
-// Established HTTP and WS connection and retrieve XDSServer config
-func (xs *XdsServer) _connect(reConn bool) error {
+// _Connect Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _Connect(reConn bool) error {
xdsCfg := xsapiv1.APIConfig{}
if err := xs.client.Get("/config", &xdsCfg); err != nil {
// Establish WS connection and register listen
if err := xs._SocketConnect(); err != nil {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
return err
}
return nil
}
-// Create WebSocket (io.socket) connection
+// _SocketConnect Create WebSocket (io.socket) connection
func (xs *XdsServer) _SocketConnect() error {
xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
if xs.CBOnDisconnect != nil {
xs.CBOnDisconnect(err)
}
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
// Try to reconnect during 15min (or at least while not disabled)
go func() {
time.Sleep(time.Second * time.Duration(waitTime))
xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
- xs._reconnect()
+ err := xs._Reconnect()
+ if err != nil &&
+ !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
+ xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
+ }
+
}
}()
})
return nil
}
-// Send event to notify changes
+// _Disconnected Set XDS Server as disconnected
+func (xs *XdsServer) _Disconnected() error {
+ // Clear all register events as socket is closed
+ for k := range xs.sockEvents {
+ delete(xs.sockEvents, k)
+ }
+ xs.Connected = false
+ xs.ioSock = nil
+ xs._NotifyState()
+ return nil
+}
+
+// _NotifyState Send event to notify changes
func (xs *XdsServer) _NotifyState() {
evSts := xaapiv1.ServerCfg{