Migration to AGL gerrit (update go import)
[src/xds/xds-server.git] / lib / syncthing / stEvent.go
1 /*
2  * Copyright (C) 2017-2018 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package st
19
20 import (
21         "encoding/json"
22         "fmt"
23         "os"
24         "strconv"
25         "strings"
26         "time"
27
28         "github.com/Sirupsen/logrus"
29         uuid "github.com/satori/go.uuid"
30         "github.com/syncthing/syncthing/lib/sync"
31 )
32
33 // Events .
34 type Events struct {
35         MonitorTime time.Duration
36         Debug       bool
37
38         stop  chan bool
39         st    *SyncThing
40         log   *logrus.Logger
41         cbArr map[string][]cbMap
42         mutex sync.Mutex
43 }
44
45 type Event struct {
46         Type string            `json:"type"`
47         Time time.Time         `json:"time"`
48         Data map[string]string `json:"data"`
49 }
50
51 type EventsCBData map[string]interface{}
52 type EventsCB func(ev Event, cbData *EventsCBData)
53
54 const (
55         EventFolderCompletion string = "FolderCompletion"
56         EventFolderSummary    string = "FolderSummary"
57         EventFolderPaused     string = "FolderPaused"
58         EventFolderResumed    string = "FolderResumed"
59         EventFolderErrors     string = "FolderErrors"
60         EventStateChanged     string = "StateChanged"
61 )
62
63 var EventsAll string = EventFolderCompletion + "|" +
64         EventFolderSummary + "|" +
65         EventFolderPaused + "|" +
66         EventFolderResumed + "|" +
67         EventFolderErrors + "|" +
68         EventStateChanged
69
70 type STEvent struct {
71         // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
72         SubscriptionID int `json:"id"`
73         // Global ID of the event across all subscriptions
74         GlobalID int                    `json:"globalID"`
75         Time     time.Time              `json:"time"`
76         Type     string                 `json:"type"`
77         Data     map[string]interface{} `json:"data"`
78 }
79
80 type cbMap struct {
81         id       string
82         cb       EventsCB
83         filterID string
84         data     *EventsCBData
85 }
86
87 // NewEventListener Create a new instance of Event listener
88 func (s *SyncThing) NewEventListener() *Events {
89         _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
90         return &Events{
91                 MonitorTime: 100, // in Milliseconds
92                 Debug:       dbg,
93                 stop:        make(chan bool, 1),
94                 st:          s,
95                 log:         s.log,
96                 cbArr:       make(map[string][]cbMap),
97                 mutex:       sync.NewMutex(),
98         }
99 }
100
101 // Start starts event monitoring loop
102 func (e *Events) Start() error {
103         go e.monitorLoop()
104         return nil
105 }
106
107 // Stop stops event monitoring loop
108 func (e *Events) Stop() {
109         e.stop <- true
110 }
111
112 // Register Add a listener on an event
113 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (string, error) {
114         if evName == "" || !strings.Contains(EventsAll, evName) {
115                 return "", fmt.Errorf("Unknown event name")
116         }
117         if data == nil {
118                 data = &EventsCBData{}
119         }
120
121         e.mutex.Lock()
122         defer e.mutex.Unlock()
123
124         cbList := []cbMap{}
125         if _, ok := e.cbArr[evName]; ok {
126                 cbList = e.cbArr[evName]
127         }
128
129         id := uuid.NewV1().String()
130         (*data)["id"] = id
131
132         e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
133
134         return id, nil
135 }
136
137 // UnRegister Remove a listener event
138 func (e *Events) UnRegister(id string) error {
139         e.mutex.Lock()
140         defer e.mutex.Unlock()
141
142         for evName, cbKey := range e.cbArr {
143                 newCbList := []cbMap{}
144                 change := false
145                 for _, k := range cbKey {
146                         if k.id != id {
147                                 newCbList = append(newCbList, k)
148                         } else {
149                                 change = true
150                         }
151                 }
152                 if change {
153                         e.cbArr[evName] = newCbList
154                 }
155         }
156
157         return nil
158 }
159
160 // GetEvents returns the Syncthing events
161 func (e *Events) getEvents(since int) ([]STEvent, error) {
162         var data []byte
163         ev := []STEvent{}
164         url := "events"
165         if since != -1 {
166                 url += "?since=" + strconv.Itoa(since)
167         }
168         if err := e.st.client.HTTPGet(url, &data); err != nil {
169                 return ev, err
170         }
171         err := json.Unmarshal(data, &ev)
172         return ev, err
173 }
174
175 // Loop to monitor Syncthing events
176 func (e *Events) monitorLoop() {
177         e.log.Infof("Event monitoring running...")
178         since := 0
179         cntErrConn := 0
180         cntErrRetry := 1
181         for {
182                 select {
183                 case <-e.stop:
184                         e.log.Infof("Event monitoring exited")
185                         return
186
187                 case <-time.After(e.MonitorTime * time.Millisecond):
188
189                         if !e.st.Connected {
190                                 cntErrConn++
191                                 time.Sleep(time.Second)
192                                 if cntErrConn > cntErrRetry {
193                                         e.log.Error("ST Event monitor: ST connection down")
194                                         cntErrConn = 0
195                                         cntErrRetry *= 2
196                                         if _, err := e.getEvents(since); err == nil {
197                                                 e.st.Connected = true
198                                                 cntErrRetry = 1
199                                                 // XXX - should we reset since value ?
200                                                 goto readEvent
201                                         }
202                                 }
203                                 continue
204                         }
205
206                 readEvent:
207                         stEvArr, err := e.getEvents(since)
208                         if err != nil {
209                                 e.log.Errorf("Syncthing Get Events: %v", err)
210                                 e.st.Connected = false
211                                 continue
212                         }
213
214                         // Process events
215                         for _, stEv := range stEvArr {
216                                 since = stEv.SubscriptionID
217                                 if e.Debug {
218                                         e.log.Warnf("ST EVENT: %d %s\n  %v", stEv.GlobalID, stEv.Type, stEv)
219                                 }
220
221                                 e.mutex.Lock()
222                                 cbKey, ok := e.cbArr[stEv.Type]
223                                 if !ok {
224                                         e.mutex.Unlock()
225                                         continue
226                                 }
227
228                                 evData := Event{
229                                         Type: stEv.Type,
230                                         Time: stEv.Time,
231                                 }
232
233                                 // Decode Events
234                                 // FIXME: re-define data struct for each events
235                                 // instead of map of string and use JSON marshing/unmarshing
236                                 fID := ""
237                                 evData.Data = make(map[string]string)
238                                 switch stEv.Type {
239
240                                 case EventFolderCompletion:
241                                         fID = convString(stEv.Data["folder"])
242                                         evData.Data["completion"] = convFloat64(stEv.Data["completion"])
243
244                                 case EventFolderSummary:
245                                         fID = convString(stEv.Data["folder"])
246                                         evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
247                                         evData.Data["state"] = convString(stEv.Data["state"])
248
249                                 case EventFolderPaused, EventFolderResumed:
250                                         fID = convString(stEv.Data["id"])
251                                         evData.Data["label"] = convString(stEv.Data["label"])
252
253                                 case EventFolderErrors:
254                                         fID = convString(stEv.Data["folder"])
255                                         // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
256
257                                 case EventStateChanged:
258                                         fID = convString(stEv.Data["folder"])
259                                         evData.Data["from"] = convString(stEv.Data["from"])
260                                         evData.Data["to"] = convString(stEv.Data["to"])
261
262                                 default:
263                                         e.log.Warnf("Unsupported event type")
264                                 }
265
266                                 if fID != "" {
267                                         evData.Data["id"] = fID
268                                 }
269
270                                 // Call all registered callbacks
271                                 for _, c := range cbKey {
272                                         if e.Debug {
273                                                 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
274                                         }
275                                         // Call when filterID is not set or when it matches
276                                         if c.filterID == "" || (fID != "" && fID == c.filterID) {
277                                                 c.cb(evData, c.data)
278                                         }
279                                 }
280
281                                 e.mutex.Unlock()
282                         }
283                 }
284         }
285 }
286
287 func convString(d interface{}) string {
288         return d.(string)
289 }
290
291 func convFloat64(d interface{}) string {
292         return strconv.FormatFloat(d.(float64), 'f', -1, 64)
293 }
294
295 func convInt64(d interface{}) string {
296         return strconv.FormatInt(d.(int64), 10)
297 }