2 * Copyright (C) 2017 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 "github.com/Sirupsen/logrus"
29 uuid "github.com/satori/go.uuid"
30 "github.com/syncthing/syncthing/lib/sync"
35 MonitorTime time.Duration
41 cbArr map[string][]cbMap
46 Type string `json:"type"`
47 Time time.Time `json:"time"`
48 Data map[string]string `json:"data"`
51 type EventsCBData map[string]interface{}
52 type EventsCB func(ev Event, cbData *EventsCBData)
55 EventFolderCompletion string = "FolderCompletion"
56 EventFolderSummary string = "FolderSummary"
57 EventFolderPaused string = "FolderPaused"
58 EventFolderResumed string = "FolderResumed"
59 EventFolderErrors string = "FolderErrors"
60 EventStateChanged string = "StateChanged"
63 var EventsAll string = EventFolderCompletion + "|" +
64 EventFolderSummary + "|" +
65 EventFolderPaused + "|" +
66 EventFolderResumed + "|" +
67 EventFolderErrors + "|" +
71 // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
72 SubscriptionID int `json:"id"`
73 // Global ID of the event across all subscriptions
74 GlobalID int `json:"globalID"`
75 Time time.Time `json:"time"`
76 Type string `json:"type"`
77 Data map[string]interface{} `json:"data"`
87 // NewEventListener Create a new instance of Event listener
88 func (s *SyncThing) NewEventListener() *Events {
89 _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
91 MonitorTime: 100, // in Milliseconds
93 stop: make(chan bool, 1),
96 cbArr: make(map[string][]cbMap),
97 mutex: sync.NewMutex(),
101 // Start starts event monitoring loop
102 func (e *Events) Start() error {
107 // Stop stops event monitoring loop
108 func (e *Events) Stop() {
112 // Register Add a listener on an event
113 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
114 if evName == "" || !strings.Contains(EventsAll, evName) {
115 return "", fmt.Errorf("Unknown event name")
118 data = &EventsCBData{}
122 defer e.mutex.Unlock()
125 if _, ok := e.cbArr[evName]; ok {
126 cbList = e.cbArr[evName]
129 id := uuid.NewV1().String()
132 e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
137 // UnRegister Remove a listener event
138 func (e *Events) UnRegister(id string) error {
140 defer e.mutex.Unlock()
142 for evName, cbKey := range e.cbArr {
143 newCbList := []cbMap{}
145 for _, k := range cbKey {
147 newCbList = append(newCbList, k)
153 e.cbArr[evName] = newCbList
160 // GetEvents returns the Syncthing events
161 func (e *Events) getEvents(since int) ([]STEvent, error) {
166 url += "?since=" + strconv.Itoa(since)
168 if err := e.st.client.HTTPGet(url, &data); err != nil {
171 err := json.Unmarshal(data, &ev)
175 // Loop to monitor Syncthing events
176 func (e *Events) monitorLoop() {
177 e.log.Infof("Event monitoring running...")
184 e.log.Infof("Event monitoring exited")
187 case <-time.After(e.MonitorTime * time.Millisecond):
191 time.Sleep(time.Second)
192 if cntErrConn > cntErrRetry {
193 e.log.Error("ST Event monitor: ST connection down")
196 if _, err := e.getEvents(since); err == nil {
197 e.st.Connected = true
199 // XXX - should we reset since value ?
207 stEvArr, err := e.getEvents(since)
209 e.log.Errorf("Syncthing Get Events: %v", err)
210 e.st.Connected = false
215 for _, stEv := range stEvArr {
216 since = stEv.SubscriptionID
218 e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv)
222 cbKey, ok := e.cbArr[stEv.Type]
234 // FIXME: re-define data struct for each events
235 // instead of map of string and use JSON marshing/unmarshing
237 evData.Data = make(map[string]string)
240 case EventFolderCompletion:
241 fID = convString(stEv.Data["folder"])
242 evData.Data["completion"] = convFloat64(stEv.Data["completion"])
244 case EventFolderSummary:
245 fID = convString(stEv.Data["folder"])
246 evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
247 evData.Data["state"] = convString(stEv.Data["state"])
249 case EventFolderPaused, EventFolderResumed:
250 fID = convString(stEv.Data["id"])
251 evData.Data["label"] = convString(stEv.Data["label"])
253 case EventFolderErrors:
254 fID = convString(stEv.Data["folder"])
255 // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
257 case EventStateChanged:
258 fID = convString(stEv.Data["folder"])
259 evData.Data["from"] = convString(stEv.Data["from"])
260 evData.Data["to"] = convString(stEv.Data["to"])
263 e.log.Warnf("Unsupported event type")
267 evData.Data["id"] = fID
270 // Call all registered callbacks
271 for _, c := range cbKey {
273 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
275 // Call when filterID is not set or when it matches
276 if c.filterID == "" || (fID != "" && fID == c.filterID) {
287 func convString(d interface{}) string {
291 func convFloat64(d interface{}) string {
292 return strconv.FormatFloat(d.(float64), 'f', -1, 64)
295 func convInt64(d interface{}) string {
296 return strconv.FormatInt(d.(int64), 10)