Fixed go linter warnings.
[src/xds/xds-server.git] / lib / syncthing / stEvent.go
1 /*
2  * Copyright (C) 2017-2018 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package st
19
20 import (
21         "encoding/json"
22         "fmt"
23         "os"
24         "strconv"
25         "strings"
26         "time"
27
28         "github.com/Sirupsen/logrus"
29         uuid "github.com/satori/go.uuid"
30         "github.com/syncthing/syncthing/lib/sync"
31 )
32
33 // Events .
34 type Events struct {
35         MonitorTime time.Duration
36         Debug       bool
37
38         stop  chan bool
39         st    *SyncThing
40         log   *logrus.Logger
41         cbArr map[string][]cbMap
42         mutex sync.Mutex
43 }
44
45 // Event Syncthing events structure
46 type Event struct {
47         Type string            `json:"type"`
48         Time time.Time         `json:"time"`
49         Data map[string]string `json:"data"`
50 }
51
52 // EventsCBData Data parameter of Event callback
53 type EventsCBData map[string]interface{}
54
55 // EventsCB Event callback
56 type EventsCB func(ev Event, cbData *EventsCBData)
57
58 const (
59         // EventFolderCompletion .
60         EventFolderCompletion string = "FolderCompletion"
61         // EventFolderSummary .
62         EventFolderSummary string = "FolderSummary"
63         // EventFolderPaused .
64         EventFolderPaused string = "FolderPaused"
65         // EventFolderResumed .
66         EventFolderResumed string = "FolderResumed"
67         // EventFolderErrors .
68         EventFolderErrors string = "FolderErrors"
69         // EventStateChanged .
70         EventStateChanged string = "StateChanged"
71 )
72
73 // EventsAll .
74 var EventsAll = EventFolderCompletion + "|" +
75         EventFolderSummary + "|" +
76         EventFolderPaused + "|" +
77         EventFolderResumed + "|" +
78         EventFolderErrors + "|" +
79         EventStateChanged
80
81 // SyncthingEvent Syncthing Event structure definition
82 type SyncthingEvent struct {
83         // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
84         SubscriptionID int `json:"id"`
85         // Global ID of the event across all subscriptions
86         GlobalID int                    `json:"globalID"`
87         Time     time.Time              `json:"time"`
88         Type     string                 `json:"type"`
89         Data     map[string]interface{} `json:"data"`
90 }
91
92 type cbMap struct {
93         id       string
94         cb       EventsCB
95         filterID string
96         data     *EventsCBData
97 }
98
99 // NewEventListener Create a new instance of Event listener
100 func (s *SyncThing) NewEventListener() *Events {
101         _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
102         return &Events{
103                 MonitorTime: 100, // in Milliseconds
104                 Debug:       dbg,
105                 stop:        make(chan bool, 1),
106                 st:          s,
107                 log:         s.log,
108                 cbArr:       make(map[string][]cbMap),
109                 mutex:       sync.NewMutex(),
110         }
111 }
112
113 // Start starts event monitoring loop
114 func (e *Events) Start() error {
115         go e.monitorLoop()
116         return nil
117 }
118
119 // Stop stops event monitoring loop
120 func (e *Events) Stop() {
121         e.stop <- true
122 }
123
124 // Register Add a listener on an event
125 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
126         if evName == "" || !strings.Contains(EventsAll, evName) {
127                 return "", fmt.Errorf("Unknown event name")
128         }
129         if data == nil {
130                 data = &EventsCBData{}
131         }
132
133         e.mutex.Lock()
134         defer e.mutex.Unlock()
135
136         cbList := []cbMap{}
137         if _, ok := e.cbArr[evName]; ok {
138                 cbList = e.cbArr[evName]
139         }
140
141         id := uuid.NewV1().String()
142         (*data)["id"] = id
143
144         e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
145
146         return id, nil
147 }
148
149 // UnRegister Remove a listener event
150 func (e *Events) UnRegister(id string) error {
151         e.mutex.Lock()
152         defer e.mutex.Unlock()
153
154         for evName, cbKey := range e.cbArr {
155                 newCbList := []cbMap{}
156                 change := false
157                 for _, k := range cbKey {
158                         if k.id != id {
159                                 newCbList = append(newCbList, k)
160                         } else {
161                                 change = true
162                         }
163                 }
164                 if change {
165                         e.cbArr[evName] = newCbList
166                 }
167         }
168
169         return nil
170 }
171
172 // GetEvents returns the Syncthing events
173 func (e *Events) getEvents(since int) ([]SyncthingEvent, error) {
174         var data []byte
175         ev := []SyncthingEvent{}
176         url := "events"
177         if since != -1 {
178                 url += "?since=" + strconv.Itoa(since)
179         }
180         if err := e.st.client.HTTPGet(url, &data); err != nil {
181                 return ev, err
182         }
183         err := json.Unmarshal(data, &ev)
184         return ev, err
185 }
186
187 // Loop to monitor Syncthing events
188 func (e *Events) monitorLoop() {
189         e.log.Infof("Event monitoring running...")
190         since := 0
191         cntErrConn := 0
192         cntErrRetry := 1
193         for {
194                 select {
195                 case <-e.stop:
196                         e.log.Infof("Event monitoring exited")
197                         return
198
199                 case <-time.After(e.MonitorTime * time.Millisecond):
200
201                         if !e.st.Connected {
202                                 cntErrConn++
203                                 time.Sleep(time.Second)
204                                 if cntErrConn > cntErrRetry {
205                                         e.log.Error("ST Event monitor: ST connection down")
206                                         cntErrConn = 0
207                                         cntErrRetry *= 2
208                                         if _, err := e.getEvents(since); err == nil {
209                                                 e.st.Connected = true
210                                                 cntErrRetry = 1
211                                                 // XXX - should we reset since value ?
212                                                 goto readEvent
213                                         }
214                                 }
215                                 continue
216                         }
217
218                 readEvent:
219                         stEvArr, err := e.getEvents(since)
220                         if err != nil {
221                                 e.log.Errorf("Syncthing Get Events: %v", err)
222                                 e.st.Connected = false
223                                 continue
224                         }
225
226                         // Process events
227                         for _, stEv := range stEvArr {
228                                 since = stEv.SubscriptionID
229                                 if e.Debug {
230                                         e.log.Warnf("ST EVENT: %d %s\n  %v", stEv.GlobalID, stEv.Type, stEv)
231                                 }
232
233                                 e.mutex.Lock()
234                                 cbKey, ok := e.cbArr[stEv.Type]
235                                 if !ok {
236                                         e.mutex.Unlock()
237                                         continue
238                                 }
239
240                                 evData := Event{
241                                         Type: stEv.Type,
242                                         Time: stEv.Time,
243                                 }
244
245                                 // Decode Events
246                                 // FIXME: re-define data struct for each events
247                                 // instead of map of string and use JSON marshing/unmarshing
248                                 fID := ""
249                                 evData.Data = make(map[string]string)
250                                 switch stEv.Type {
251
252                                 case EventFolderCompletion:
253                                         fID = convString(stEv.Data["folder"])
254                                         evData.Data["completion"] = convFloat64(stEv.Data["completion"])
255
256                                 case EventFolderSummary:
257                                         fID = convString(stEv.Data["folder"])
258                                         evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
259                                         evData.Data["state"] = convString(stEv.Data["state"])
260
261                                 case EventFolderPaused, EventFolderResumed:
262                                         fID = convString(stEv.Data["id"])
263                                         evData.Data["label"] = convString(stEv.Data["label"])
264
265                                 case EventFolderErrors:
266                                         fID = convString(stEv.Data["folder"])
267                                         // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
268
269                                 case EventStateChanged:
270                                         fID = convString(stEv.Data["folder"])
271                                         evData.Data["from"] = convString(stEv.Data["from"])
272                                         evData.Data["to"] = convString(stEv.Data["to"])
273
274                                 default:
275                                         e.log.Warnf("Unsupported event type")
276                                 }
277
278                                 if fID != "" {
279                                         evData.Data["id"] = fID
280                                 }
281
282                                 // Call all registered callbacks
283                                 for _, c := range cbKey {
284                                         if e.Debug {
285                                                 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
286                                         }
287                                         // Call when filterID is not set or when it matches
288                                         if c.filterID == "" || (fID != "" && fID == c.filterID) {
289                                                 c.cb(evData, c.data)
290                                         }
291                                 }
292
293                                 e.mutex.Unlock()
294                         }
295                 }
296         }
297 }
298
299 func convString(d interface{}) string {
300         return d.(string)
301 }
302
303 func convFloat64(d interface{}) string {
304         return strconv.FormatFloat(d.(float64), 'f', -1, 64)
305 }
306
307 func convInt64(d interface{}) string {
308         return strconv.FormatInt(d.(int64), 10)
309 }