X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fsyncthing%2FstEvent.go;h=ec11b56e0f894ef2c6161bac7162f19c7901832b;hb=dd877bc8168c1a87a9d05d36f42b333b861c6e7e;hp=bf2a809e28be0c51484c7e09451c4f59aae06096;hpb=8f44cc7217ce48f3f94c8ea3f037cdf011c4493b;p=src%2Fxds%2Fxds-server.git diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go index bf2a809..ec11b56 100644 --- a/lib/syncthing/stEvent.go +++ b/lib/syncthing/stEvent.go @@ -1,3 +1,20 @@ +/* + * Copyright (C) 2017-2018 "IoT.bzh" + * Author Sebastien Douheret + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package st import ( @@ -9,6 +26,8 @@ import ( "time" "github.com/Sirupsen/logrus" + uuid "github.com/satori/go.uuid" + "github.com/syncthing/syncthing/lib/sync" ) // Events . @@ -20,34 +39,47 @@ type Events struct { st *SyncThing log *logrus.Logger cbArr map[string][]cbMap + mutex sync.Mutex } +// Event Syncthing events structure type Event struct { Type string `json:"type"` Time time.Time `json:"time"` Data map[string]string `json:"data"` } +// EventsCBData Data parameter of Event callback type EventsCBData map[string]interface{} + +// EventsCB Event callback type EventsCB func(ev Event, cbData *EventsCBData) const ( + // EventFolderCompletion . EventFolderCompletion string = "FolderCompletion" - EventFolderSummary string = "FolderSummary" - EventFolderPaused string = "FolderPaused" - EventFolderResumed string = "FolderResumed" - EventFolderErrors string = "FolderErrors" - EventStateChanged string = "StateChanged" + // EventFolderSummary . + EventFolderSummary string = "FolderSummary" + // EventFolderPaused . + EventFolderPaused string = "FolderPaused" + // EventFolderResumed . + EventFolderResumed string = "FolderResumed" + // EventFolderErrors . + EventFolderErrors string = "FolderErrors" + // EventStateChanged . + EventStateChanged string = "StateChanged" ) -var EventsAll string = EventFolderCompletion + "|" + +// EventsAll . +var EventsAll = EventFolderCompletion + "|" + EventFolderSummary + "|" + EventFolderPaused + "|" + EventFolderResumed + "|" + EventFolderErrors + "|" + EventStateChanged -type STEvent struct { +// SyncthingEvent Syncthing Event structure definition +type SyncthingEvent struct { // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API SubscriptionID int `json:"id"` // Global ID of the event across all subscriptions @@ -58,7 +90,7 @@ type STEvent struct { } type cbMap struct { - id int + id string cb EventsCB filterID string data *EventsCBData @@ -74,6 +106,7 @@ func (s *SyncThing) NewEventListener() *Events { st: s, log: s.log, cbArr: make(map[string][]cbMap), + mutex: sync.NewMutex(), } } @@ -89,21 +122,24 @@ func (e *Events) Stop() { } // Register Add a listener on an event -func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) { if evName == "" || !strings.Contains(EventsAll, evName) { - return -1, fmt.Errorf("Unknown event name") + return "", fmt.Errorf("Unknown event name") } if data == nil { data = &EventsCBData{} } + e.mutex.Lock() + defer e.mutex.Unlock() + cbList := []cbMap{} if _, ok := e.cbArr[evName]; ok { cbList = e.cbArr[evName] } - id := len(cbList) - (*data)["id"] = strconv.Itoa(id) + id := uuid.NewV1().String() + (*data)["id"] = id e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) @@ -111,28 +147,32 @@ func (e *Events) Register(evName string, cb EventsCB, filterID string, data *Eve } // UnRegister Remove a listener event -func (e *Events) UnRegister(evName string, id int) error { - cbKey, ok := e.cbArr[evName] - if !ok { - return fmt.Errorf("No event registered to such name") - } - - // FIXME - NOT TESTED - if id >= len(cbKey) { - return fmt.Errorf("Invalid id") - } else if id == len(cbKey) { - e.cbArr[evName] = cbKey[:id-1] - } else { - e.cbArr[evName] = cbKey[id : id+1] +func (e *Events) UnRegister(id string) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + for evName, cbKey := range e.cbArr { + newCbList := []cbMap{} + change := false + for _, k := range cbKey { + if k.id != id { + newCbList = append(newCbList, k) + } else { + change = true + } + } + if change { + e.cbArr[evName] = newCbList + } } return nil } // GetEvents returns the Syncthing events -func (e *Events) getEvents(since int) ([]STEvent, error) { +func (e *Events) getEvents(since int) ([]SyncthingEvent, error) { var data []byte - ev := []STEvent{} + ev := []SyncthingEvent{} url := "events" if since != -1 { url += "?since=" + strconv.Itoa(since) @@ -148,6 +188,8 @@ func (e *Events) getEvents(since int) ([]STEvent, error) { func (e *Events) monitorLoop() { e.log.Infof("Event monitoring running...") since := 0 + cntErrConn := 0 + cntErrRetry := 1 for { select { case <-e.stop: @@ -155,11 +197,32 @@ func (e *Events) monitorLoop() { return case <-time.After(e.MonitorTime * time.Millisecond): + + if !e.st.Connected { + cntErrConn++ + time.Sleep(time.Second) + if cntErrConn > cntErrRetry { + e.log.Error("ST Event monitor: ST connection down") + cntErrConn = 0 + cntErrRetry *= 2 + if _, err := e.getEvents(since); err == nil { + e.st.Connected = true + cntErrRetry = 1 + // XXX - should we reset since value ? + goto readEvent + } + } + continue + } + + readEvent: stEvArr, err := e.getEvents(since) if err != nil { e.log.Errorf("Syncthing Get Events: %v", err) + e.st.Connected = false continue } + // Process events for _, stEv := range stEvArr { since = stEv.SubscriptionID @@ -167,8 +230,10 @@ func (e *Events) monitorLoop() { e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) } + e.mutex.Lock() cbKey, ok := e.cbArr[stEv.Type] if !ok { + e.mutex.Unlock() continue } @@ -224,6 +289,8 @@ func (e *Events) monitorLoop() { c.cb(evData, c.data) } } + + e.mutex.Unlock() } } }