Migration to AGL gerrit (update go import)
[src/xds/xds-agent.git] / lib / agent / xdsserver.go
1 /*
2  * Copyright (C) 2017-2018 "IoT.bzh"
3  * Author Sebastien Douheret <sebastien@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package agent
19
20 import (
21         "encoding/json"
22         "fmt"
23         "io"
24         "net/http"
25         "strings"
26         "sync"
27         "time"
28
29         "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xaapiv1"
30         "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xdsconfig"
31         common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git/golib"
32         "gerrit.automotivelinux.org/gerrit/src/xds/xds-server.git/lib/xsapiv1"
33         "github.com/gin-gonic/gin"
34         uuid "github.com/satori/go.uuid"
35         sio_client "github.com/sebd71/go-socket.io-client"
36 )
37
38 // XdsServer .
39 type XdsServer struct {
40         *Context
41         ID           string
42         BaseURL      string
43         APIURL       string
44         PartialURL   string
45         ConnRetry    int
46         Connected    bool
47         Disabled     bool
48         ServerConfig *xsapiv1.APIConfig
49
50         // Events management
51         CBOnError      func(error)
52         CBOnDisconnect func(error)
53         sockEvents     map[string][]*caller
54         sockEventsLock *sync.Mutex
55
56         // Private fields
57         client      *common.HTTPClient
58         ioSock      *sio_client.Client
59         logOut      io.Writer
60         apiRouter   *gin.RouterGroup
61         cmdList     map[string]interface{}
62         cbOnConnect OnConnectedCB
63 }
64
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
67
68 // OnConnectedCB connect callback
69 type OnConnectedCB func(svr *XdsServer) error
70
71 // caller Used to chain event listeners
72 type caller struct {
73         id          uuid.UUID
74         EventName   string
75         Func        EventCB
76         PrivateData interface{}
77 }
78
79 const _IDTempoPrefix = "tempo-"
80
81 // NewXdsServer creates an instance of XdsServer
82 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
83         return &XdsServer{
84                 Context:    ctx,
85                 ID:         _IDTempoPrefix + uuid.NewV1().String(),
86                 BaseURL:    conf.URL,
87                 APIURL:     conf.APIBaseURL + conf.APIPartialURL,
88                 PartialURL: conf.APIPartialURL,
89                 ConnRetry:  conf.ConnRetry,
90                 Connected:  false,
91                 Disabled:   false,
92
93                 sockEvents:     make(map[string][]*caller),
94                 sockEventsLock: &sync.Mutex{},
95                 logOut:         ctx.Log.Out,
96                 cmdList:        make(map[string]interface{}),
97         }
98 }
99
100 // Close Free and close XDS Server connection
101 func (xs *XdsServer) Close() error {
102         err := xs._Disconnected()
103         xs.Disabled = true
104         return err
105 }
106
107 // Connect Establish HTTP connection with XDS Server
108 func (xs *XdsServer) Connect() error {
109         var err error
110         var retry int
111
112         xs.Disabled = false
113         xs.Connected = false
114
115         err = nil
116         for retry = xs.ConnRetry; retry > 0; retry-- {
117                 if err = xs._CreateConnectHTTP(); err == nil {
118                         break
119                 }
120                 if retry == xs.ConnRetry {
121                         // Notify only on the first conn error
122                         // doing that avoid 2 notifs (conn false; conn true) on startup
123                         xs._NotifyState()
124                 }
125                 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
126                 time.Sleep(time.Second)
127         }
128         if retry == 0 {
129                 // FIXME: re-use _Reconnect to wait longer in background
130                 return fmt.Errorf("Connection to XDS Server failure")
131         }
132         if err != nil {
133                 return err
134         }
135
136         // Check HTTP connection and establish WS connection
137         err = xs._Connect(false)
138
139         return err
140 }
141
142 // ConnectOn Register a callback on events reception
143 func (xs *XdsServer) ConnectOn(f OnConnectedCB) error {
144         xs.cbOnConnect = f
145         return nil
146 }
147
148 // IsTempoID returns true when server as a temporary id
149 func (xs *XdsServer) IsTempoID() bool {
150         return strings.HasPrefix(xs.ID, _IDTempoPrefix)
151 }
152
153 // SetLoggerOutput Set logger ou
154 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
155         xs.logOut = out
156 }
157
158 // SendCommand Send a command to XDS Server
159 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
160         url := cmd
161         if !strings.HasPrefix("/", cmd) {
162                 url = "/" + cmd
163         }
164         return xs.client.Post(url, string(body), res)
165 }
166
167 // GetVersion Send Get request to retrieve XDS Server version
168 func (xs *XdsServer) GetVersion(res interface{}) error {
169         return xs.client.Get("/version", &res)
170 }
171
172 // GetFolders Send GET request to get current folder configuration
173 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
174         return xs.client.Get("/folders", folders)
175 }
176
177 // FolderAdd Send POST request to add a folder
178 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
179         err := xs.client.Post("/folders", fld, res)
180         if err != nil {
181                 return fmt.Errorf("FolderAdd error: %s", err.Error())
182         }
183         return err
184 }
185
186 // FolderDelete Send DELETE request to delete a folder
187 func (xs *XdsServer) FolderDelete(id string) error {
188         return xs.client.HTTPDelete("/folders/" + id)
189 }
190
191 // FolderSync Send POST request to force synchronization of a folder
192 func (xs *XdsServer) FolderSync(id string) error {
193         return xs.client.HTTPPost("/folders/sync/"+id, "")
194 }
195
196 // FolderUpdate Send PUT request to update a folder
197 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
198         return xs.client.Put("/folders/"+fld.ID, fld, resFld)
199 }
200
201 // CommandExec Send POST request to execute a command
202 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
203         return xs.client.Post("/exec", args, res)
204 }
205
206 // CommandSignal Send POST request to send a signal to a command
207 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
208         return xs.client.Post("/signal", args, res)
209 }
210
211 // SetAPIRouterGroup .
212 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
213         xs.apiRouter = r
214 }
215
216 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
217 func (xs *XdsServer) PassthroughGet(url string) {
218         if xs.apiRouter == nil {
219                 xs.Log.Errorf("apiRouter not set !")
220                 return
221         }
222
223         xs.apiRouter.GET(url, func(c *gin.Context) {
224                 var data interface{}
225                 // Take care of param (eg. id in /projects/:id)
226                 nURL := url
227                 if strings.Contains(url, ":") {
228                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
229                 }
230                 // Send Get request
231                 if err := xs.client.Get(nURL, &data); err != nil {
232                         if strings.Contains(err.Error(), "connection refused") {
233                                 xs._Disconnected()
234                         }
235                         common.APIError(c, err.Error())
236                         return
237                 }
238
239                 c.JSON(http.StatusOK, data)
240         })
241 }
242
243 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
244 func (xs *XdsServer) PassthroughPost(url string) {
245         if xs.apiRouter == nil {
246                 xs.Log.Errorf("apiRouter not set !")
247                 return
248         }
249
250         xs.apiRouter.POST(url, func(c *gin.Context) {
251                 var err error
252                 var data interface{}
253
254                 // Get raw body
255                 body, err := c.GetRawData()
256                 if err != nil {
257                         common.APIError(c, err.Error())
258                         return
259                 }
260
261                 // Take care of param (eg. id in /projects/:id)
262                 nURL := url
263                 if strings.Contains(url, ":") {
264                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
265                 }
266
267                 // Send Post request
268                 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
269                 if err != nil {
270                         goto httpError
271                 }
272                 if response.StatusCode != 200 {
273                         err = fmt.Errorf(response.Status)
274                         return
275                 }
276                 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
277                 if err != nil {
278                         goto httpError
279                 }
280
281                 c.JSON(http.StatusOK, data)
282                 return
283
284                 /* Handle error case */
285         httpError:
286                 if strings.Contains(err.Error(), "connection refused") {
287                         xs._Disconnected()
288                 }
289                 common.APIError(c, err.Error())
290         })
291 }
292
293 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
294 func (xs *XdsServer) PassthroughDelete(url string) {
295         if xs.apiRouter == nil {
296                 xs.Log.Errorf("apiRouter not set !")
297                 return
298         }
299
300         xs.apiRouter.DELETE(url, func(c *gin.Context) {
301                 var err error
302                 var data interface{}
303
304                 // Take care of param (eg. id in /projects/:id)
305                 nURL := url
306                 if strings.Contains(url, ":") {
307                         nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
308                 }
309
310                 // Send Post request
311                 response, err := xs.client.HTTPDeleteWithRes(nURL)
312                 if err != nil {
313                         goto httpError
314                 }
315                 if response.StatusCode != 200 {
316                         err = fmt.Errorf(response.Status)
317                         return
318                 }
319                 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
320                 if err != nil {
321                         goto httpError
322                 }
323
324                 c.JSON(http.StatusOK, data)
325                 return
326
327                 /* Handle error case */
328         httpError:
329                 if strings.Contains(err.Error(), "connection refused") {
330                         xs._Disconnected()
331                 }
332                 common.APIError(c, err.Error())
333         })
334 }
335
336 // EventRegister Post a request to register to an XdsServer event
337 func (xs *XdsServer) EventRegister(evName string, filter string) error {
338         return xs.client.Post("/events/register",
339                 xsapiv1.EventRegisterArgs{
340                         Name:   evName,
341                         Filter: filter,
342                 },
343                 nil)
344 }
345
346 // EventEmit Emit a event to XDS Server through WS
347 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
348         if xs.ioSock == nil {
349                 return fmt.Errorf("Io.Socket not initialized")
350         }
351
352         return xs.ioSock.Emit(message, args...)
353 }
354
355 // EventOn Register a callback on events reception
356 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
357         if xs.ioSock == nil {
358                 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
359         }
360
361         xs.sockEventsLock.Lock()
362         defer xs.sockEventsLock.Unlock()
363
364         if _, exist := xs.sockEvents[evName]; !exist {
365                 // Register listener only the first time
366                 evn := evName
367
368                 err := xs.ioSock.On(evn, func(data interface{}) error {
369                         xs.sockEventsLock.Lock()
370                         sEvts := make([]*caller, len(xs.sockEvents[evn]))
371                         copy(sEvts, xs.sockEvents[evn])
372                         xs.sockEventsLock.Unlock()
373                         for _, c := range sEvts {
374                                 c.Func(c.PrivateData, data)
375                         }
376                         return nil
377                 })
378                 if err != nil {
379                         return uuid.Nil, err
380                 }
381         }
382
383         c := &caller{
384                 id:          uuid.NewV1(),
385                 EventName:   evName,
386                 Func:        f,
387                 PrivateData: privData,
388         }
389
390         xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
391         xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
392         return c.id, nil
393 }
394
395 // EventOff Un-register a (or all) callbacks associated to an event
396 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
397         xs.sockEventsLock.Lock()
398         defer xs.sockEventsLock.Unlock()
399         if _, exist := xs.sockEvents[evName]; exist {
400                 if id == uuid.Nil {
401                         // Un-register all
402                         xs.sockEvents[evName] = []*caller{}
403                 } else {
404                         // Un-register only the specified callback
405                         for i, ff := range xs.sockEvents[evName] {
406                                 if uuid.Equal(ff.id, id) {
407                                         xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
408                                         break
409                                 }
410                         }
411                 }
412         }
413         xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
414         return nil
415 }
416
417 // ProjectToFolder Convert Project structure to Folder structure
418 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
419         stID := ""
420         if pPrj.Type == xsapiv1.TypeCloudSync {
421                 stID, _ = xs.SThg.IDGet()
422         }
423         // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
424         fPrj := xsapiv1.FolderConfig{
425                 ID:         pPrj.ID,
426                 Label:      pPrj.Label,
427                 ClientPath: pPrj.ClientPath,
428                 Type:       xsapiv1.FolderType(pPrj.Type),
429                 Status:     pPrj.Status,
430                 IsInSync:   pPrj.IsInSync,
431                 DefaultSdk: pPrj.DefaultSdk,
432                 ClientData: pPrj.ClientData,
433                 DataPathMap: xsapiv1.PathMapConfig{
434                         ServerPath: pPrj.ServerPath,
435                 },
436                 DataCloudSync: xsapiv1.CloudSyncConfig{
437                         SyncThingID:   stID,
438                         STLocIsInSync: pPrj.IsInSync,
439                         STLocStatus:   pPrj.Status,
440                         STSvrIsInSync: pPrj.IsInSync,
441                         STSvrStatus:   pPrj.Status,
442                 },
443         }
444
445         return &fPrj
446 }
447
448 // FolderToProject Convert Folder structure to Project structure
449 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
450         inSync := fPrj.IsInSync
451         sts := fPrj.Status
452
453         if fPrj.Type == xsapiv1.TypeCloudSync {
454                 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
455
456                 sts = fPrj.DataCloudSync.STSvrStatus
457                 switch fPrj.DataCloudSync.STLocStatus {
458                 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
459                         sts = fPrj.DataCloudSync.STLocStatus
460                         break
461                 case xaapiv1.StatusSyncing:
462                         if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
463                                 sts = xaapiv1.StatusSyncing
464                         }
465                         break
466                 case xaapiv1.StatusEnable:
467                         // keep STSvrStatus
468                         break
469                 }
470         }
471
472         pPrj := xaapiv1.ProjectConfig{
473                 ID:         fPrj.ID,
474                 ServerID:   xs.ID,
475                 Label:      fPrj.Label,
476                 ClientPath: fPrj.ClientPath,
477                 ServerPath: fPrj.DataPathMap.ServerPath,
478                 Type:       xaapiv1.ProjectType(fPrj.Type),
479                 Status:     sts,
480                 IsInSync:   inSync,
481                 DefaultSdk: fPrj.DefaultSdk,
482                 ClientData: fPrj.ClientData,
483         }
484         return pPrj
485 }
486
487 // CommandAdd Add a new command to the list of running commands
488 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
489         if xs.CommandGet(cmdID) != nil {
490                 return fmt.Errorf("command id already exist")
491         }
492         xs.cmdList[cmdID] = data
493         return nil
494 }
495
496 // CommandDelete Delete a command from the command list
497 func (xs *XdsServer) CommandDelete(cmdID string) error {
498         if xs.CommandGet(cmdID) == nil {
499                 return fmt.Errorf("unknown command id")
500         }
501         delete(xs.cmdList, cmdID)
502         return nil
503 }
504
505 // CommandGet Retrieve a command data
506 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
507         d, exist := xs.cmdList[cmdID]
508         if exist {
509                 return d
510         }
511         return nil
512 }
513
514 /***
515 ** Private functions
516 ***/
517
518 // Create HTTP client
519 func (xs *XdsServer) _CreateConnectHTTP() error {
520         var err error
521         xs.client, err = common.HTTPNewClient(xs.BaseURL,
522                 common.HTTPClientConfig{
523                         URLPrefix:           "/api/v1",
524                         HeaderClientKeyName: "Xds-Sid",
525                         CsrfDisable:         true,
526                         LogOut:              xs.logOut,
527                         LogPrefix:           "XDSSERVER: ",
528                         LogLevel:            common.HTTPLogLevelWarning,
529                 })
530
531         xs.client.SetLogLevel(xs.Log.Level.String())
532
533         if err != nil {
534                 msg := ": " + err.Error()
535                 if strings.Contains(err.Error(), "connection refused") {
536                         msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
537                 }
538                 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
539         }
540         if xs.client == nil {
541                 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
542         }
543
544         return nil
545 }
546
547 // _Reconnect Re-established connection
548 func (xs *XdsServer) _Reconnect() error {
549
550         // Note that ConnectOn callback will be called (see apiv1.go file)
551         err := xs._Connect(true)
552
553         return err
554 }
555
556 // _Connect Established HTTP and WS connection and retrieve XDSServer config
557 func (xs *XdsServer) _Connect(reConn bool) error {
558
559         xdsCfg := xsapiv1.APIConfig{}
560         if err := xs.client.Get("/config", &xdsCfg); err != nil {
561                 xs.Connected = false
562                 if !reConn {
563                         xs._NotifyState()
564                 }
565                 return err
566         }
567
568         if reConn && xs.ID != xdsCfg.ServerUID {
569                 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
570         }
571
572         // Update local XDS config
573         xs.ID = xdsCfg.ServerUID
574         xs.ServerConfig = &xdsCfg
575
576         // Establish WS connection and register listen
577         if err := xs._SocketConnect(); err != nil {
578                 xs._Disconnected()
579                 return err
580         }
581
582         xs.Connected = true
583
584         // Call OnConnect callback
585         if xs.cbOnConnect != nil {
586                 xs.cbOnConnect(xs)
587         }
588
589         xs._NotifyState()
590         return nil
591 }
592
593 // _SocketConnect Create WebSocket (io.socket) connection
594 func (xs *XdsServer) _SocketConnect() error {
595
596         xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
597
598         opts := &sio_client.Options{
599                 Transport: "websocket",
600                 Header:    make(map[string][]string),
601         }
602         opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
603
604         iosk, err := sio_client.NewClient(xs.BaseURL, opts)
605         if err != nil {
606                 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
607         }
608         xs.ioSock = iosk
609
610         // Register some listeners
611
612         iosk.On("error", func(err error) {
613                 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
614                 if xs.CBOnError != nil {
615                         xs.CBOnError(err)
616                 }
617         })
618
619         iosk.On("disconnection", func(err error) {
620                 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
621                 if xs.CBOnDisconnect != nil {
622                         xs.CBOnDisconnect(err)
623                 }
624                 xs._Disconnected()
625
626                 // Try to reconnect during 15min (or at least while not disabled)
627                 go func() {
628                         count := 0
629                         waitTime := 1
630                         for !xs.Disabled && !xs.Connected {
631                                 count++
632                                 if count%60 == 0 {
633                                         waitTime *= 5
634                                 }
635                                 if waitTime > 15*60 {
636                                         xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
637                                         return
638                                 }
639                                 time.Sleep(time.Second * time.Duration(waitTime))
640                                 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
641
642                                 err := xs._Reconnect()
643                                 if err != nil &&
644                                         !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
645                                         xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
646                                 }
647
648                         }
649                 }()
650         })
651
652         // XXX - There is no connection event generated so, just consider that
653         // we are connected when NewClient return successfully
654         /* iosk.On("connection", func() { ... }) */
655         xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
656
657         return nil
658 }
659
660 // _Disconnected Set XDS Server as disconnected
661 func (xs *XdsServer) _Disconnected() error {
662         // Clear all register events as socket is closed
663         for k := range xs.sockEvents {
664                 delete(xs.sockEvents, k)
665         }
666         xs.Connected = false
667         xs.ioSock = nil
668         xs._NotifyState()
669         return nil
670 }
671
672 // _NotifyState Send event to notify changes
673 func (xs *XdsServer) _NotifyState() {
674
675         evSts := xaapiv1.ServerCfg{
676                 ID:         xs.ID,
677                 URL:        xs.BaseURL,
678                 APIURL:     xs.APIURL,
679                 PartialURL: xs.PartialURL,
680                 ConnRetry:  xs.ConnRetry,
681                 Connected:  xs.Connected,
682         }
683         if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
684                 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
685         }
686 }