+/*
+ * Copyright (C) 2017-2018 "IoT.bzh"
+ * Author Sebastien Douheret <sebastien@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package st
import (
"time"
"github.com/Sirupsen/logrus"
+ uuid "github.com/satori/go.uuid"
+ "github.com/syncthing/syncthing/lib/sync"
)
// Events .
st *SyncThing
log *logrus.Logger
cbArr map[string][]cbMap
+ mutex sync.Mutex
}
+// Event Syncthing events structure
type Event struct {
Type string `json:"type"`
Time time.Time `json:"time"`
Data map[string]string `json:"data"`
}
+// EventsCBData Data parameter of Event callback
type EventsCBData map[string]interface{}
+
+// EventsCB Event callback
type EventsCB func(ev Event, cbData *EventsCBData)
const (
+ // EventFolderCompletion .
EventFolderCompletion string = "FolderCompletion"
- EventFolderSummary string = "FolderSummary"
- EventFolderPaused string = "FolderPaused"
- EventFolderResumed string = "FolderResumed"
- EventFolderErrors string = "FolderErrors"
- EventStateChanged string = "StateChanged"
+ // EventFolderSummary .
+ EventFolderSummary string = "FolderSummary"
+ // EventFolderPaused .
+ EventFolderPaused string = "FolderPaused"
+ // EventFolderResumed .
+ EventFolderResumed string = "FolderResumed"
+ // EventFolderErrors .
+ EventFolderErrors string = "FolderErrors"
+ // EventStateChanged .
+ EventStateChanged string = "StateChanged"
)
-var EventsAll string = EventFolderCompletion + "|" +
+// EventsAll .
+var EventsAll = EventFolderCompletion + "|" +
EventFolderSummary + "|" +
EventFolderPaused + "|" +
EventFolderResumed + "|" +
EventFolderErrors + "|" +
EventStateChanged
-type STEvent struct {
+// SyncthingEvent Syncthing Event structure definition
+type SyncthingEvent struct {
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
SubscriptionID int `json:"id"`
// Global ID of the event across all subscriptions
}
type cbMap struct {
- id int
+ id string
cb EventsCB
filterID string
data *EventsCBData
st: s,
log: s.log,
cbArr: make(map[string][]cbMap),
+ mutex: sync.NewMutex(),
}
}
}
// Register Add a listener on an event
-func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
+func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
if evName == "" || !strings.Contains(EventsAll, evName) {
- return -1, fmt.Errorf("Unknown event name")
+ return "", fmt.Errorf("Unknown event name")
}
if data == nil {
data = &EventsCBData{}
}
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
+
cbList := []cbMap{}
if _, ok := e.cbArr[evName]; ok {
cbList = e.cbArr[evName]
}
- id := len(cbList)
- (*data)["id"] = strconv.Itoa(id)
+ id := uuid.NewV1().String()
+ (*data)["id"] = id
e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
}
// UnRegister Remove a listener event
-func (e *Events) UnRegister(evName string, id int) error {
- cbKey, ok := e.cbArr[evName]
- if !ok {
- return fmt.Errorf("No event registered to such name")
- }
-
- // FIXME - NOT TESTED
- if id >= len(cbKey) {
- return fmt.Errorf("Invalid id")
- } else if id == len(cbKey) {
- e.cbArr[evName] = cbKey[:id-1]
- } else {
- e.cbArr[evName] = cbKey[id : id+1]
+func (e *Events) UnRegister(id string) error {
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
+
+ for evName, cbKey := range e.cbArr {
+ newCbList := []cbMap{}
+ change := false
+ for _, k := range cbKey {
+ if k.id != id {
+ newCbList = append(newCbList, k)
+ } else {
+ change = true
+ }
+ }
+ if change {
+ e.cbArr[evName] = newCbList
+ }
}
return nil
}
// GetEvents returns the Syncthing events
-func (e *Events) getEvents(since int) ([]STEvent, error) {
+func (e *Events) getEvents(since int) ([]SyncthingEvent, error) {
var data []byte
- ev := []STEvent{}
+ ev := []SyncthingEvent{}
url := "events"
if since != -1 {
url += "?since=" + strconv.Itoa(since)
func (e *Events) monitorLoop() {
e.log.Infof("Event monitoring running...")
since := 0
+ cntErrConn := 0
+ cntErrRetry := 1
for {
select {
case <-e.stop:
return
case <-time.After(e.MonitorTime * time.Millisecond):
+
+ if !e.st.Connected {
+ cntErrConn++
+ time.Sleep(time.Second)
+ if cntErrConn > cntErrRetry {
+ e.log.Error("ST Event monitor: ST connection down")
+ cntErrConn = 0
+ cntErrRetry *= 2
+ if _, err := e.getEvents(since); err == nil {
+ e.st.Connected = true
+ cntErrRetry = 1
+ // XXX - should we reset since value ?
+ goto readEvent
+ }
+ }
+ continue
+ }
+
+ readEvent:
stEvArr, err := e.getEvents(since)
if err != nil {
e.log.Errorf("Syncthing Get Events: %v", err)
+ e.st.Connected = false
continue
}
+
// Process events
for _, stEv := range stEvArr {
since = stEv.SubscriptionID
e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv)
}
+ e.mutex.Lock()
cbKey, ok := e.cbArr[stEv.Type]
if !ok {
+ e.mutex.Unlock()
continue
}
c.cb(evData, c.data)
}
}
+
+ e.mutex.Unlock()
}
}
}