X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fsyncthing%2FstEvent.go;h=948f88a6fb45e52d58ba21481cf29d2fb792a071;hb=285332c351777b74abca638b8b2a2cde3c68edc6;hp=bf2a809e28be0c51484c7e09451c4f59aae06096;hpb=8f44cc7217ce48f3f94c8ea3f037cdf011c4493b;p=src%2Fxds%2Fxds-server.git diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go index bf2a809..948f88a 100644 --- a/lib/syncthing/stEvent.go +++ b/lib/syncthing/stEvent.go @@ -1,3 +1,20 @@ +/* + * Copyright (C) 2017 "IoT.bzh" + * Author Sebastien Douheret + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package st import ( @@ -9,6 +26,8 @@ import ( "time" "github.com/Sirupsen/logrus" + uuid "github.com/satori/go.uuid" + "github.com/syncthing/syncthing/lib/sync" ) // Events . @@ -20,6 +39,7 @@ type Events struct { st *SyncThing log *logrus.Logger cbArr map[string][]cbMap + mutex sync.Mutex } type Event struct { @@ -58,7 +78,7 @@ type STEvent struct { } type cbMap struct { - id int + id string cb EventsCB filterID string data *EventsCBData @@ -74,6 +94,7 @@ func (s *SyncThing) NewEventListener() *Events { st: s, log: s.log, cbArr: make(map[string][]cbMap), + mutex: sync.NewMutex(), } } @@ -89,21 +110,24 @@ func (e *Events) Stop() { } // Register Add a listener on an event -func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) { if evName == "" || !strings.Contains(EventsAll, evName) { - return -1, fmt.Errorf("Unknown event name") + return "", fmt.Errorf("Unknown event name") } if data == nil { data = &EventsCBData{} } + e.mutex.Lock() + defer e.mutex.Unlock() + cbList := []cbMap{} if _, ok := e.cbArr[evName]; ok { cbList = e.cbArr[evName] } - id := len(cbList) - (*data)["id"] = strconv.Itoa(id) + id := uuid.NewV1().String() + (*data)["id"] = id e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) @@ -111,19 +135,23 @@ func (e *Events) Register(evName string, cb EventsCB, filterID string, data *Eve } // UnRegister Remove a listener event -func (e *Events) UnRegister(evName string, id int) error { - cbKey, ok := e.cbArr[evName] - if !ok { - return fmt.Errorf("No event registered to such name") - } - - // FIXME - NOT TESTED - if id >= len(cbKey) { - return fmt.Errorf("Invalid id") - } else if id == len(cbKey) { - e.cbArr[evName] = cbKey[:id-1] - } else { - e.cbArr[evName] = cbKey[id : id+1] +func (e *Events) UnRegister(id string) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + for evName, cbKey := range e.cbArr { + newCbList := []cbMap{} + change := false + for _, k := range cbKey { + if k.id != id { + newCbList = append(newCbList, k) + } else { + change = true + } + } + if change { + e.cbArr[evName] = newCbList + } } return nil @@ -148,6 +176,8 @@ func (e *Events) getEvents(since int) ([]STEvent, error) { func (e *Events) monitorLoop() { e.log.Infof("Event monitoring running...") since := 0 + cntErrConn := 0 + cntErrRetry := 1 for { select { case <-e.stop: @@ -155,11 +185,32 @@ func (e *Events) monitorLoop() { return case <-time.After(e.MonitorTime * time.Millisecond): + + if !e.st.Connected { + cntErrConn++ + time.Sleep(time.Second) + if cntErrConn > cntErrRetry { + e.log.Error("ST Event monitor: ST connection down") + cntErrConn = 0 + cntErrRetry *= 2 + if _, err := e.getEvents(since); err == nil { + e.st.Connected = true + cntErrRetry = 1 + // XXX - should we reset since value ? + goto readEvent + } + } + continue + } + + readEvent: stEvArr, err := e.getEvents(since) if err != nil { e.log.Errorf("Syncthing Get Events: %v", err) + e.st.Connected = false continue } + // Process events for _, stEv := range stEvArr { since = stEv.SubscriptionID @@ -167,8 +218,10 @@ func (e *Events) monitorLoop() { e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) } + e.mutex.Lock() cbKey, ok := e.cbArr[stEv.Type] if !ok { + e.mutex.Unlock() continue } @@ -224,6 +277,8 @@ func (e *Events) monitorLoop() { c.cb(evData, c.data) } } + + e.mutex.Unlock() } } }