2 * Copyright (C) 2017-2018 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 "github.com/Sirupsen/logrus"
29 uuid "github.com/satori/go.uuid"
30 "github.com/syncthing/syncthing/lib/sync"
35 MonitorTime time.Duration
41 cbArr map[string][]cbMap
45 // Event Syncthing events structure
47 Type string `json:"type"`
48 Time time.Time `json:"time"`
49 Data map[string]string `json:"data"`
52 // EventsCBData Data parameter of Event callback
53 type EventsCBData map[string]interface{}
55 // EventsCB Event callback
56 type EventsCB func(ev Event, cbData *EventsCBData)
59 // EventFolderCompletion .
60 EventFolderCompletion string = "FolderCompletion"
61 // EventFolderSummary .
62 EventFolderSummary string = "FolderSummary"
63 // EventFolderPaused .
64 EventFolderPaused string = "FolderPaused"
65 // EventFolderResumed .
66 EventFolderResumed string = "FolderResumed"
67 // EventFolderErrors .
68 EventFolderErrors string = "FolderErrors"
69 // EventStateChanged .
70 EventStateChanged string = "StateChanged"
74 var EventsAll = EventFolderCompletion + "|" +
75 EventFolderSummary + "|" +
76 EventFolderPaused + "|" +
77 EventFolderResumed + "|" +
78 EventFolderErrors + "|" +
81 // SyncthingEvent Syncthing Event structure definition
82 type SyncthingEvent struct {
83 // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
84 SubscriptionID int `json:"id"`
85 // Global ID of the event across all subscriptions
86 GlobalID int `json:"globalID"`
87 Time time.Time `json:"time"`
88 Type string `json:"type"`
89 Data map[string]interface{} `json:"data"`
99 // NewEventListener Create a new instance of Event listener
100 func (s *SyncThing) NewEventListener() *Events {
101 _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
103 MonitorTime: 100, // in Milliseconds
105 stop: make(chan bool, 1),
108 cbArr: make(map[string][]cbMap),
109 mutex: sync.NewMutex(),
113 // Start starts event monitoring loop
114 func (e *Events) Start() error {
119 // Stop stops event monitoring loop
120 func (e *Events) Stop() {
124 // Register Add a listener on an event
125 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
126 if evName == "" || !strings.Contains(EventsAll, evName) {
127 return "", fmt.Errorf("Unknown event name")
130 data = &EventsCBData{}
134 defer e.mutex.Unlock()
137 if _, ok := e.cbArr[evName]; ok {
138 cbList = e.cbArr[evName]
141 id := uuid.NewV1().String()
144 e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
149 // UnRegister Remove a listener event
150 func (e *Events) UnRegister(id string) error {
152 defer e.mutex.Unlock()
154 for evName, cbKey := range e.cbArr {
155 newCbList := []cbMap{}
157 for _, k := range cbKey {
159 newCbList = append(newCbList, k)
165 e.cbArr[evName] = newCbList
172 // GetEvents returns the Syncthing events
173 func (e *Events) getEvents(since int) ([]SyncthingEvent, error) {
175 ev := []SyncthingEvent{}
178 url += "?since=" + strconv.Itoa(since)
180 if err := e.st.client.HTTPGet(url, &data); err != nil {
183 err := json.Unmarshal(data, &ev)
187 // Loop to monitor Syncthing events
188 func (e *Events) monitorLoop() {
189 e.log.Infof("Event monitoring running...")
196 e.log.Infof("Event monitoring exited")
199 case <-time.After(e.MonitorTime * time.Millisecond):
203 time.Sleep(time.Second)
204 if cntErrConn > cntErrRetry {
205 e.log.Error("ST Event monitor: ST connection down")
208 if _, err := e.getEvents(since); err == nil {
209 e.st.Connected = true
211 // XXX - should we reset since value ?
219 stEvArr, err := e.getEvents(since)
221 e.log.Errorf("Syncthing Get Events: %v", err)
222 e.st.Connected = false
227 for _, stEv := range stEvArr {
228 since = stEv.SubscriptionID
230 e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv)
234 cbKey, ok := e.cbArr[stEv.Type]
246 // FIXME: re-define data struct for each events
247 // instead of map of string and use JSON marshing/unmarshing
249 evData.Data = make(map[string]string)
252 case EventFolderCompletion:
253 fID = convString(stEv.Data["folder"])
254 evData.Data["completion"] = convFloat64(stEv.Data["completion"])
256 case EventFolderSummary:
257 fID = convString(stEv.Data["folder"])
258 evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
259 evData.Data["state"] = convString(stEv.Data["state"])
261 case EventFolderPaused, EventFolderResumed:
262 fID = convString(stEv.Data["id"])
263 evData.Data["label"] = convString(stEv.Data["label"])
265 case EventFolderErrors:
266 fID = convString(stEv.Data["folder"])
267 // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
269 case EventStateChanged:
270 fID = convString(stEv.Data["folder"])
271 evData.Data["from"] = convString(stEv.Data["from"])
272 evData.Data["to"] = convString(stEv.Data["to"])
275 e.log.Warnf("Unsupported event type")
279 evData.Data["id"] = fID
282 // Call all registered callbacks
283 for _, c := range cbKey {
285 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
287 // Call when filterID is not set or when it matches
288 if c.filterID == "" || (fID != "" && fID == c.filterID) {
299 func convString(d interface{}) string {
303 func convFloat64(d interface{}) string {
304 return strconv.FormatFloat(d.(float64), 'f', -1, 64)
307 func convInt64(d interface{}) string {
308 return strconv.FormatInt(d.(int64), 10)