9ca8b7839fbdd0a7e6dd41998b96cb4bd85fdd10
[src/xds/xds-agent.git] / lib / syncthing / stEvent.go
1 package st
2
3 import (
4         "encoding/json"
5         "fmt"
6         "os"
7         "strconv"
8         "strings"
9         "time"
10
11         "github.com/Sirupsen/logrus"
12 )
13
14 // Events .
15 type Events struct {
16         MonitorTime time.Duration
17         Debug       bool
18
19         stop  chan bool
20         st    *SyncThing
21         log   *logrus.Logger
22         cbArr map[string][]cbMap
23 }
24
25 type Event struct {
26         Type string            `json:"type"`
27         Time time.Time         `json:"time"`
28         Data map[string]string `json:"data"`
29 }
30
31 type EventsCBData map[string]interface{}
32 type EventsCB func(ev Event, cbData *EventsCBData)
33
34 const (
35         EventFolderCompletion string = "FolderCompletion"
36         EventFolderSummary    string = "FolderSummary"
37         EventFolderPaused     string = "FolderPaused"
38         EventFolderResumed    string = "FolderResumed"
39         EventFolderErrors     string = "FolderErrors"
40         EventStateChanged     string = "StateChanged"
41 )
42
43 var EventsAll string = EventFolderCompletion + "|" +
44         EventFolderSummary + "|" +
45         EventFolderPaused + "|" +
46         EventFolderResumed + "|" +
47         EventFolderErrors + "|" +
48         EventStateChanged
49
50 type STEvent struct {
51         // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
52         SubscriptionID int `json:"id"`
53         // Global ID of the event across all subscriptions
54         GlobalID int                    `json:"globalID"`
55         Time     time.Time              `json:"time"`
56         Type     string                 `json:"type"`
57         Data     map[string]interface{} `json:"data"`
58 }
59
60 type cbMap struct {
61         id       int
62         cb       EventsCB
63         filterID string
64         data     *EventsCBData
65 }
66
67 // NewEventListener Create a new instance of Event listener
68 func (s *SyncThing) NewEventListener() *Events {
69         _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
70         return &Events{
71                 MonitorTime: 100, // in Milliseconds
72                 Debug:       dbg,
73                 stop:        make(chan bool, 1),
74                 st:          s,
75                 log:         s.log,
76                 cbArr:       make(map[string][]cbMap),
77         }
78 }
79
80 // Start starts event monitoring loop
81 func (e *Events) Start() error {
82         go e.monitorLoop()
83         return nil
84 }
85
86 // Stop stops event monitoring loop
87 func (e *Events) Stop() {
88         e.stop <- true
89 }
90
91 // Register Add a listener on an event
92 func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
93         if evName == "" || !strings.Contains(EventsAll, evName) {
94                 return -1, fmt.Errorf("Unknown event name")
95         }
96         if data == nil {
97                 data = &EventsCBData{}
98         }
99
100         cbList := []cbMap{}
101         if _, ok := e.cbArr[evName]; ok {
102                 cbList = e.cbArr[evName]
103         }
104
105         id := len(cbList)
106         (*data)["id"] = strconv.Itoa(id)
107
108         e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
109
110         return id, nil
111 }
112
113 // UnRegister Remove a listener event
114 func (e *Events) UnRegister(evName string, id int) error {
115         cbKey, ok := e.cbArr[evName]
116         if !ok {
117                 return fmt.Errorf("No event registered to such name")
118         }
119
120         // FIXME - NOT TESTED
121         if id >= len(cbKey) {
122                 return fmt.Errorf("Invalid id")
123         } else if id == len(cbKey) {
124                 e.cbArr[evName] = cbKey[:id-1]
125         } else {
126                 e.cbArr[evName] = cbKey[id : id+1]
127         }
128
129         return nil
130 }
131
132 // GetEvents returns the Syncthing events
133 func (e *Events) getEvents(since int) ([]STEvent, error) {
134         var data []byte
135         ev := []STEvent{}
136         url := "events"
137         if since != -1 {
138                 url += "?since=" + strconv.Itoa(since)
139         }
140         if err := e.st.client.HTTPGet(url, &data); err != nil {
141                 return ev, err
142         }
143         err := json.Unmarshal(data, &ev)
144         return ev, err
145 }
146
147 // Loop to monitor Syncthing events
148 func (e *Events) monitorLoop() {
149         e.log.Infof("Event monitoring running...")
150         since := 0
151         cntErrConn := 0
152         cntErrRetry := 1
153         for {
154                 select {
155                 case <-e.stop:
156                         e.log.Infof("Event monitoring exited")
157                         return
158
159                 case <-time.After(e.MonitorTime * time.Millisecond):
160
161                         if !e.st.Connected {
162                                 cntErrConn++
163                                 time.Sleep(time.Second)
164                                 if cntErrConn > cntErrRetry {
165                                         e.log.Error("ST Event monitor: ST connection down")
166                                         cntErrConn = 0
167                                         cntErrRetry *= 2
168                                         if _, err := e.getEvents(since); err == nil {
169                                                 e.st.Connected = true
170                                                 cntErrRetry = 1
171                                                 // XXX - should we reset since value ?
172                                                 goto readEvent
173                                         }
174                                 }
175                                 continue
176                         }
177
178                 readEvent:
179                         stEvArr, err := e.getEvents(since)
180                         if err != nil {
181                                 e.log.Errorf("Syncthing Get Events: %v", err)
182                                 e.st.Connected = false
183                                 continue
184                         }
185
186                         // Process events
187                         for _, stEv := range stEvArr {
188                                 since = stEv.SubscriptionID
189                                 if e.Debug {
190                                         e.log.Warnf("ST EVENT: %d %s\n  %v", stEv.GlobalID, stEv.Type, stEv)
191                                 }
192
193                                 cbKey, ok := e.cbArr[stEv.Type]
194                                 if !ok {
195                                         continue
196                                 }
197
198                                 evData := Event{
199                                         Type: stEv.Type,
200                                         Time: stEv.Time,
201                                 }
202
203                                 // Decode Events
204                                 // FIXME: re-define data struct for each events
205                                 // instead of map of string and use JSON marshing/unmarshing
206                                 fID := ""
207                                 evData.Data = make(map[string]string)
208                                 switch stEv.Type {
209
210                                 case EventFolderCompletion:
211                                         fID = convString(stEv.Data["folder"])
212                                         evData.Data["completion"] = convFloat64(stEv.Data["completion"])
213
214                                 case EventFolderSummary:
215                                         fID = convString(stEv.Data["folder"])
216                                         evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
217                                         evData.Data["state"] = convString(stEv.Data["state"])
218
219                                 case EventFolderPaused, EventFolderResumed:
220                                         fID = convString(stEv.Data["id"])
221                                         evData.Data["label"] = convString(stEv.Data["label"])
222
223                                 case EventFolderErrors:
224                                         fID = convString(stEv.Data["folder"])
225                                         // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
226
227                                 case EventStateChanged:
228                                         fID = convString(stEv.Data["folder"])
229                                         evData.Data["from"] = convString(stEv.Data["from"])
230                                         evData.Data["to"] = convString(stEv.Data["to"])
231
232                                 default:
233                                         e.log.Warnf("Unsupported event type")
234                                 }
235
236                                 if fID != "" {
237                                         evData.Data["id"] = fID
238                                 }
239
240                                 // Call all registered callbacks
241                                 for _, c := range cbKey {
242                                         if e.Debug {
243                                                 e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
244                                         }
245                                         // Call when filterID is not set or when it matches
246                                         if c.filterID == "" || (fID != "" && fID == c.filterID) {
247                                                 c.cb(evData, c.data)
248                                         }
249                                 }
250                         }
251                 }
252         }
253 }
254
255 func convString(d interface{}) string {
256         return d.(string)
257 }
258
259 func convFloat64(d interface{}) string {
260         return strconv.FormatFloat(d.(float64), 'f', -1, 64)
261 }
262
263 func convInt64(d interface{}) string {
264         return strconv.FormatInt(d.(int64), 10)
265 }