Migration to AGL gerrit (update go import)
[src/xds/xds-agent.git] / lib / syncthing / stEvent.go
1 /*
2  * Copyright (C) 2017-2018 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package st
19
20 import (
21         "encoding/json"
22         "fmt"
23         "os"
24         "strconv"
25         "strings"
26         "time"
27
28         "github.com/Sirupsen/logrus"
29 )
30
31 // Events .
32 type Events struct {
33         MonitorTime time.Duration
34         Debug       bool
35
36         stop  chan bool
37         st    *SyncThing
38         log   *logrus.Logger
39         cbArr map[string][]cbMap
40 }
41
42 type Event struct {
43         Type string            `json:"type"`
44         Time time.Time         `json:"time"`
45         Data map[string]string `json:"data"`
46 }
47
48 type EventsCBData map[string]interface{}
49 type EventsCB func(ev Event, cbData *EventsCBData)
50
51 const (
52         EventFolderCompletion string = "FolderCompletion"
53         EventFolderSummary    string = "FolderSummary"
54         EventFolderPaused     string = "FolderPaused"
55         EventFolderResumed    string = "FolderResumed"
56         EventFolderErrors     string = "FolderErrors"
57         EventStateChanged     string = "StateChanged"
58 )
59
60 var EventsAll string = EventFolderCompletion + "|" +
61         EventFolderSummary + "|" +
62         EventFolderPaused + "|" +
63         EventFolderResumed + "|" +
64         EventFolderErrors + "|" +
65         EventStateChanged
66
67 type STEvent struct {
68         // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
69         SubscriptionID int `json:"id"`
70         // Global ID of the event across all subscriptions
71         GlobalID int                    `json:"globalID"`
72         Time     time.Time              `json:"time"`
73         Type     string                 `json:"type"`
74         Data     map[string]interface{} `json:"data"`
75 }
76
77 type cbMap struct {
78         id       int
79         cb       EventsCB
80         filterID string
81         data     *EventsCBData
82 }
83
84 // NewEventListener Create a new instance of Event listener
85 func (s *SyncThing) NewEventListener() *Events {
86         _, dbg := os.LookupEnv("XDS_LOG_SILLY_STEVENTS") // set to add more debug log
87         return &Events{
88                 MonitorTime: 100, // in Milliseconds
89                 Debug:       dbg,
90                 stop:        make(chan bool, 1),
91                 st:          s,
92                 log:         s.log,
93                 cbArr:       make(map[string][]cbMap),
94         }
95 }
96
97 // Start starts event monitoring loop
98 func (e *Events) Start() error {
99         go e.monitorLoop()
100         return nil
101 }
102
103 // Stop stops event monitoring loop
104 func (e *Events) Stop() {
105         e.stop <- true
106 }
107
108 // Register Add a listener on an event
109 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
110         if evName == "" || !strings.Contains(EventsAll, evName) {
111                 return -1, fmt.Errorf("Unknown event name")
112         }
113         if data == nil {
114                 data = &EventsCBData{}
115         }
116
117         cbList := []cbMap{}
118         if _, ok := e.cbArr[evName]; ok {
119                 cbList = e.cbArr[evName]
120         }
121
122         id := len(cbList)
123         (*data)["id"] = strconv.Itoa(id)
124
125         e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
126
127         return id, nil
128 }
129
130 // UnRegister Remove a listener event
131 func (e *Events) UnRegister(evName string, id int) error {
132         cbKey, ok := e.cbArr[evName]
133         if !ok {
134                 return fmt.Errorf("No event registered to such name")
135         }
136
137         // FIXME - NOT TESTED
138         if id >= len(cbKey) {
139                 return fmt.Errorf("Invalid id")
140         } else if id == len(cbKey) {
141                 e.cbArr[evName] = cbKey[:id-1]
142         } else {
143                 e.cbArr[evName] = cbKey[id : id+1]
144         }
145
146         return nil
147 }
148
149 // GetEvents returns the Syncthing events
150 func (e *Events) getEvents(since int) ([]STEvent, error) {
151         var data []byte
152         ev := []STEvent{}
153         url := "events"
154         if since != -1 {
155                 url += "?since=" + strconv.Itoa(since)
156         }
157         if err := e.st.client.HTTPGet(url, &data); err != nil {
158                 return ev, err
159         }
160         err := json.Unmarshal(data, &ev)
161         return ev, err
162 }
163
164 // Loop to monitor Syncthing events
165 func (e *Events) monitorLoop() {
166         e.log.Infof("Event monitoring running...")
167         since := 0
168         cntErrConn := 0
169         cntErrRetry := 1
170         for {
171                 select {
172                 case <-e.stop:
173                         e.log.Infof("Event monitoring exited")
174                         return
175
176                 case <-time.After(e.MonitorTime * time.Millisecond):
177
178                         if !e.st.Connected {
179                                 cntErrConn++
180                                 time.Sleep(time.Second)
181                                 if cntErrConn > cntErrRetry {
182                                         e.log.Error("ST Event monitor: ST connection down")
183                                         cntErrConn = 0
184                                         cntErrRetry *= 2
185                                         if _, err := e.getEvents(since); err == nil {
186                                                 e.st.Connected = true
187                                                 cntErrRetry = 1
188                                                 // XXX - should we reset since value ?
189                                                 goto readEvent
190                                         }
191                                 }
192                                 continue
193                         }
194
195                 readEvent:
196                         stEvArr, err := e.getEvents(since)
197                         if err != nil {
198                                 e.log.Errorf("Syncthing Get Events: %v", err)
199                                 e.st.Connected = false
200                                 continue
201                         }
202
203                         // Process events
204                         for _, stEv := range stEvArr {
205                                 since = stEv.SubscriptionID
206                                 if e.Debug {
207                                         e.log.Warnf("ST EVENT: %d %s\n  %v", stEv.GlobalID, stEv.Type, stEv)
208                                 }
209
210                                 cbKey, ok := e.cbArr[stEv.Type]
211                                 if !ok {
212                                         continue
213                                 }
214
215                                 evData := Event{
216                                         Type: stEv.Type,
217                                         Time: stEv.Time,
218                                 }
219
220                                 // Decode Events
221                                 // FIXME: re-define data struct for each events
222                                 // instead of map of string and use JSON marshing/unmarshing
223                                 fID := ""
224                                 evData.Data = make(map[string]string)
225                                 switch stEv.Type {
226
227                                 case EventFolderCompletion:
228                                         fID = convString(stEv.Data["folder"])
229                                         evData.Data["completion"] = convFloat64(stEv.Data["completion"])
230
231                                 case EventFolderSummary:
232                                         fID = convString(stEv.Data["folder"])
233                                         evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
234                                         evData.Data["state"] = convString(stEv.Data["state"])
235
236                                 case EventFolderPaused, EventFolderResumed:
237                                         fID = convString(stEv.Data["id"])
238                                         evData.Data["label"] = convString(stEv.Data["label"])
239
240                                 case EventFolderErrors:
241                                         fID = convString(stEv.Data["folder"])
242                                         // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
243
244                                 case EventStateChanged:
245                                         fID = convString(stEv.Data["folder"])
246                                         evData.Data["from"] = convString(stEv.Data["from"])
247                                         evData.Data["to"] = convString(stEv.Data["to"])
248
249                                 default:
250                                         e.log.Warnf("Unsupported event type")
251                                 }
252
253                                 if fID != "" {
254                                         evData.Data["id"] = fID
255                                 }
256
257                                 // Call all registered callbacks
258                                 for _, c := range cbKey {
259                                         if e.Debug {
260                                                 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
261                                         }
262                                         // Call when filterID is not set or when it matches
263                                         if c.filterID == "" || (fID != "" && fID == c.filterID) {
264                                                 c.cb(evData, c.data)
265                                         }
266                                 }
267                         }
268                 }
269         }
270 }
271
272 func convString(d interface{}) string {
273         return d.(string)
274 }
275
276 func convFloat64(d interface{}) string {
277         return strconv.FormatFloat(d.(float64), 'f', -1, 64)
278 }
279
280 func convInt64(d interface{}) string {
281         return strconv.FormatInt(d.(int64), 10)
282 }