Fixed xds-server folder events detection.
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
index bca4b66..32656cf 100644 (file)
@@ -1,3 +1,20 @@
+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package agent
 
 import (
@@ -79,11 +96,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
 
 // Close Free and close XDS Server connection
 func (xs *XdsServer) Close() error {
-       xs.Connected = false
+       err := xs._Disconnected()
        xs.Disabled = true
-       xs.ioSock = nil
-       xs._NotifyState()
-       return nil
+       return err
 }
 
 // Connect Establish HTTP connection with XDS Server
@@ -108,7 +123,7 @@ func (xs *XdsServer) Connect() error {
                time.Sleep(time.Second)
        }
        if retry == 0 {
-               // FIXME: re-use _reconnect to wait longer in background
+               // FIXME: re-use _Reconnect to wait longer in background
                return fmt.Errorf("Connection to XDS Server failure")
        }
        if err != nil {
@@ -116,7 +131,7 @@ func (xs *XdsServer) Connect() error {
        }
 
        // Check HTTP connection and establish WS connection
-       err = xs._connect(false)
+       err = xs._Connect(false)
 
        return err
 }
@@ -206,8 +221,7 @@ func (xs *XdsServer) PassthroughGet(url string) {
                // Send Get request
                if err := xs.client.Get(nURL, &data); err != nil {
                        if strings.Contains(err.Error(), "connection refused") {
-                               xs.Connected = false
-                               xs._NotifyState()
+                               xs._Disconnected()
                        }
                        common.APIError(c, err.Error())
                        return
@@ -293,21 +307,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu
                // Register listener only the first time
                evn := evName
 
-               // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
-               var err error
-               if evName == "event:folder-state-change" {
-                       err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
-                               xs.sockEventsLock.Lock()
-                               sEvts := make([]*caller, len(xs.sockEvents[evn]))
-                               copy(sEvts, xs.sockEvents[evn])
-                               xs.sockEventsLock.Unlock()
-                               for _, c := range sEvts {
-                                       c.Func(c.PrivateData, data)
-                               }
-                               return nil
-                       })
-               } else {
-                       err = xs.ioSock.On(evn, func(data interface{}) error {
+               err := xs.ioSock.On(evn, func(data interface{}) error {
                                xs.sockEventsLock.Lock()
                                sEvts := make([]*caller, len(xs.sockEvents[evn]))
                                copy(sEvts, xs.sockEvents[evn])
@@ -317,7 +317,6 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu
                                }
                                return nil
                        })
-               }
                if err != nil {
                        return uuid.Nil, err
                }
@@ -331,6 +330,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu
        }
 
        xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+       xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
        return c.id, nil
 }
 
@@ -352,6 +352,7 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
                        }
                }
        }
+       xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
        return nil
 }
 
@@ -485,9 +486,9 @@ func (xs *XdsServer) _CreateConnectHTTP() error {
        return nil
 }
 
-//  Re-established connection
-func (xs *XdsServer) _reconnect() error {
-       err := xs._connect(true)
+// _Reconnect Re-established connection
+func (xs *XdsServer) _Reconnect() error {
+       err := xs._Connect(true)
        if err == nil {
                // Reload projects list for this server
                err = xs.projects.Init(xs)
@@ -495,8 +496,8 @@ func (xs *XdsServer) _reconnect() error {
        return err
 }
 
-//  Established HTTP and WS connection and retrieve XDSServer config
-func (xs *XdsServer) _connect(reConn bool) error {
+// _Connect Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _Connect(reConn bool) error {
 
        xdsCfg := xsapiv1.APIConfig{}
        if err := xs.client.Get("/config", &xdsCfg); err != nil {
@@ -517,8 +518,7 @@ func (xs *XdsServer) _connect(reConn bool) error {
 
        // Establish WS connection and register listen
        if err := xs._SocketConnect(); err != nil {
-               xs.Connected = false
-               xs._NotifyState()
+               xs._Disconnected()
                return err
        }
 
@@ -527,7 +527,7 @@ func (xs *XdsServer) _connect(reConn bool) error {
        return nil
 }
 
-// Create WebSocket (io.socket) connection
+// _SocketConnect Create WebSocket (io.socket) connection
 func (xs *XdsServer) _SocketConnect() error {
 
        xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
@@ -558,8 +558,7 @@ func (xs *XdsServer) _SocketConnect() error {
                if xs.CBOnDisconnect != nil {
                        xs.CBOnDisconnect(err)
                }
-               xs.Connected = false
-               xs._NotifyState()
+               xs._Disconnected()
 
                // Try to reconnect during 15min (or at least while not disabled)
                go func() {
@@ -577,7 +576,12 @@ func (xs *XdsServer) _SocketConnect() error {
                                time.Sleep(time.Second * time.Duration(waitTime))
                                xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
 
-                               xs._reconnect()
+                               err := xs._Reconnect()
+                               if err != nil &&
+                                       !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
+                                       xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
+                               }
+
                        }
                }()
        })
@@ -590,7 +594,19 @@ func (xs *XdsServer) _SocketConnect() error {
        return nil
 }
 
-// Send event to notify changes
+// _Disconnected Set XDS Server as disconnected
+func (xs *XdsServer) _Disconnected() error {
+       // Clear all register events as socket is closed
+       for k := range xs.sockEvents {
+               delete(xs.sockEvents, k)
+       }
+       xs.Connected = false
+       xs.ioSock = nil
+       xs._NotifyState()
+       return nil
+}
+
+// _NotifyState Send event to notify changes
 func (xs *XdsServer) _NotifyState() {
 
        evSts := xaapiv1.ServerCfg{