Fixed and improved events management.
[src/xds/xds-server.git] / lib / syncthing / stEvent.go
index bf2a809..948f88a 100644 (file)
@@ -1,3 +1,20 @@
+/*
+ * Copyright (C) 2017 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package st
 
 import (
@@ -9,6 +26,8 @@ import (
        "time"
 
        "github.com/Sirupsen/logrus"
+       uuid "github.com/satori/go.uuid"
+       "github.com/syncthing/syncthing/lib/sync"
 )
 
 // Events .
@@ -20,6 +39,7 @@ type Events struct {
        st    *SyncThing
        log   *logrus.Logger
        cbArr map[string][]cbMap
+       mutex sync.Mutex
 }
 
 type Event struct {
@@ -58,7 +78,7 @@ type STEvent struct {
 }
 
 type cbMap struct {
-       id       int
+       id       string
        cb       EventsCB
        filterID string
        data     *EventsCBData
@@ -74,6 +94,7 @@ func (s *SyncThing) NewEventListener() *Events {
                st:          s,
                log:         s.log,
                cbArr:       make(map[string][]cbMap),
+               mutex:       sync.NewMutex(),
        }
 }
 
@@ -89,21 +110,24 @@ func (e *Events) Stop() {
 }
 
 // Register Add a listener on an event
-func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
+func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
        if evName == "" || !strings.Contains(EventsAll, evName) {
-               return -1, fmt.Errorf("Unknown event name")
+               return "", fmt.Errorf("Unknown event name")
        }
        if data == nil {
                data = &EventsCBData{}
        }
 
+       e.mutex.Lock()
+       defer e.mutex.Unlock()
+
        cbList := []cbMap{}
        if _, ok := e.cbArr[evName]; ok {
                cbList = e.cbArr[evName]
        }
 
-       id := len(cbList)
-       (*data)["id"] = strconv.Itoa(id)
+       id := uuid.NewV1().String()
+       (*data)["id"] = id
 
        e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
 
@@ -111,19 +135,23 @@ func (e *Events) Register(evName string, cb EventsCB, filterID string, data *Eve
 }
 
 // UnRegister Remove a listener event
-func (e *Events) UnRegister(evName string, id int) error {
-       cbKey, ok := e.cbArr[evName]
-       if !ok {
-               return fmt.Errorf("No event registered to such name")
-       }
-
-       // FIXME - NOT TESTED
-       if id >= len(cbKey) {
-               return fmt.Errorf("Invalid id")
-       } else if id == len(cbKey) {
-               e.cbArr[evName] = cbKey[:id-1]
-       } else {
-               e.cbArr[evName] = cbKey[id : id+1]
+func (e *Events) UnRegister(id string) error {
+       e.mutex.Lock()
+       defer e.mutex.Unlock()
+
+       for evName, cbKey := range e.cbArr {
+               newCbList := []cbMap{}
+               change := false
+               for _, k := range cbKey {
+                       if k.id != id {
+                               newCbList = append(newCbList, k)
+                       } else {
+                               change = true
+                       }
+               }
+               if change {
+                       e.cbArr[evName] = newCbList
+               }
        }
 
        return nil
@@ -148,6 +176,8 @@ func (e *Events) getEvents(since int) ([]STEvent, error) {
 func (e *Events) monitorLoop() {
        e.log.Infof("Event monitoring running...")
        since := 0
+       cntErrConn := 0
+       cntErrRetry := 1
        for {
                select {
                case <-e.stop:
@@ -155,11 +185,32 @@ func (e *Events) monitorLoop() {
                        return
 
                case <-time.After(e.MonitorTime * time.Millisecond):
+
+                       if !e.st.Connected {
+                               cntErrConn++
+                               time.Sleep(time.Second)
+                               if cntErrConn > cntErrRetry {
+                                       e.log.Error("ST Event monitor: ST connection down")
+                                       cntErrConn = 0
+                                       cntErrRetry *= 2
+                                       if _, err := e.getEvents(since); err == nil {
+                                               e.st.Connected = true
+                                               cntErrRetry = 1
+                                               // XXX - should we reset since value ?
+                                               goto readEvent
+                                       }
+                               }
+                               continue
+                       }
+
+               readEvent:
                        stEvArr, err := e.getEvents(since)
                        if err != nil {
                                e.log.Errorf("Syncthing Get Events: %v", err)
+                               e.st.Connected = false
                                continue
                        }
+
                        // Process events
                        for _, stEv := range stEvArr {
                                since = stEv.SubscriptionID
@@ -167,8 +218,10 @@ func (e *Events) monitorLoop() {
                                        e.log.Warnf("ST EVENT: %d %s\n  %v", stEv.GlobalID, stEv.Type, stEv)
                                }
 
+                               e.mutex.Lock()
                                cbKey, ok := e.cbArr[stEv.Type]
                                if !ok {
+                                       e.mutex.Unlock()
                                        continue
                                }
 
@@ -224,6 +277,8 @@ func (e *Events) monitorLoop() {
                                                c.cb(evData, c.data)
                                        }
                                }
+
+                               e.mutex.Unlock()
                        }
                }
        }