2 * Copyright (C) 2017-2018 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xaapiv1"
30 "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xdsconfig"
31 common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git/golib"
32 "gerrit.automotivelinux.org/gerrit/src/xds/xds-server.git/lib/xsapiv1"
33 "github.com/gin-gonic/gin"
34 uuid "github.com/satori/go.uuid"
35 sio_client "github.com/sebd71/go-socket.io-client"
39 type XdsServer struct {
48 ServerConfig *xsapiv1.APIConfig
52 CBOnDisconnect func(error)
53 sockEvents map[string][]*caller
54 sockEventsLock *sync.Mutex
57 client *common.HTTPClient
58 ioSock *sio_client.Client
60 apiRouter *gin.RouterGroup
61 cmdList map[string]interface{}
62 cbOnConnect OnConnectedCB
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
68 // OnConnectedCB connect callback
69 type OnConnectedCB func(svr *XdsServer) error
71 // caller Used to chain event listeners
76 PrivateData interface{}
79 const _IDTempoPrefix = "tempo-"
81 // NewXdsServer creates an instance of XdsServer
82 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
85 ID: _IDTempoPrefix + uuid.NewV1().String(),
87 APIURL: conf.APIBaseURL + conf.APIPartialURL,
88 PartialURL: conf.APIPartialURL,
89 ConnRetry: conf.ConnRetry,
93 sockEvents: make(map[string][]*caller),
94 sockEventsLock: &sync.Mutex{},
96 cmdList: make(map[string]interface{}),
100 // Close Free and close XDS Server connection
101 func (xs *XdsServer) Close() error {
102 err := xs._Disconnected()
107 // Connect Establish HTTP connection with XDS Server
108 func (xs *XdsServer) Connect() error {
116 for retry = xs.ConnRetry; retry > 0; retry-- {
117 if err = xs._CreateConnectHTTP(); err == nil {
120 if retry == xs.ConnRetry {
121 // Notify only on the first conn error
122 // doing that avoid 2 notifs (conn false; conn true) on startup
125 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
126 time.Sleep(time.Second)
129 // FIXME: re-use _Reconnect to wait longer in background
130 return fmt.Errorf("Connection to XDS Server failure")
136 // Check HTTP connection and establish WS connection
137 err = xs._Connect(false)
142 // ConnectOn Register a callback on events reception
143 func (xs *XdsServer) ConnectOn(f OnConnectedCB) error {
148 // IsTempoID returns true when server as a temporary id
149 func (xs *XdsServer) IsTempoID() bool {
150 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
153 // SetLoggerOutput Set logger ou
154 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
158 // SendCommand Send a command to XDS Server
159 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
161 if !strings.HasPrefix("/", cmd) {
164 return xs.client.Post(url, string(body), res)
167 // GetVersion Send Get request to retrieve XDS Server version
168 func (xs *XdsServer) GetVersion(res interface{}) error {
169 return xs.client.Get("/version", &res)
172 // GetFolders Send GET request to get current folder configuration
173 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
174 return xs.client.Get("/folders", folders)
177 // FolderAdd Send POST request to add a folder
178 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
179 err := xs.client.Post("/folders", fld, res)
181 return fmt.Errorf("FolderAdd error: %s", err.Error())
186 // FolderDelete Send DELETE request to delete a folder
187 func (xs *XdsServer) FolderDelete(id string) error {
188 return xs.client.HTTPDelete("/folders/" + id)
191 // FolderSync Send POST request to force synchronization of a folder
192 func (xs *XdsServer) FolderSync(id string) error {
193 return xs.client.HTTPPost("/folders/sync/"+id, "")
196 // FolderUpdate Send PUT request to update a folder
197 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
198 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
201 // CommandExec Send POST request to execute a command
202 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
203 return xs.client.Post("/exec", args, res)
206 // CommandSignal Send POST request to send a signal to a command
207 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
208 return xs.client.Post("/signal", args, res)
211 // CommandTgtTerminalGet Send GET request to retrieve info of a target terminals
212 func (xs *XdsServer) CommandTgtTerminalGet(targetID, termID string, res *xsapiv1.TerminalConfig) error {
213 return xs.client.Get("/targets/"+targetID+"/terminals/"+termID, res)
216 // CommandTgtTerminalOpen Send POST request to open a target terminal
217 func (xs *XdsServer) CommandTgtTerminalOpen(targetID string, termID string, res *xsapiv1.TerminalConfig) error {
218 var empty interface{}
219 return xs.client.Post("/targets/"+targetID+"/terminals/"+termID+"/open", &empty, res)
222 // CommandTgtTerminalSignal Send POST request to send a signal to a target terminal
223 func (xs *XdsServer) CommandTgtTerminalSignal(args *xsapiv1.TerminalSignalArgs, res *xsapiv1.TerminalConfig) error {
224 return xs.client.Post("/signal", args, res)
227 // SetAPIRouterGroup .
228 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
232 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
233 func (xs *XdsServer) PassthroughGet(url string) {
234 if xs.apiRouter == nil {
235 xs.Log.Errorf("apiRouter not set !")
239 xs.apiRouter.GET(url, func(c *gin.Context) {
241 // Take care of param (eg. id in /projects/:id)
243 if strings.Contains(url, ":") {
244 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
247 if err := xs.client.Get(nURL, &data); err != nil {
248 if strings.Contains(err.Error(), "connection refused") {
251 common.APIError(c, err.Error())
255 c.JSON(http.StatusOK, data)
259 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
260 func (xs *XdsServer) PassthroughPost(url string) {
261 if xs.apiRouter == nil {
262 xs.Log.Errorf("apiRouter not set !")
266 xs.apiRouter.POST(url, func(c *gin.Context) {
271 body, err := c.GetRawData()
273 common.APIError(c, err.Error())
277 // Take care of param (eg. id in /projects/:id)
279 if strings.Contains(url, ":") {
280 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
284 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
288 if response.StatusCode != 200 {
289 err = fmt.Errorf(response.Status)
292 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
297 c.JSON(http.StatusOK, data)
300 /* Handle error case */
302 if strings.Contains(err.Error(), "connection refused") {
305 common.APIError(c, err.Error())
309 // PassthroughPut Used to declare a route that sends directly a PUT request to XDS Server
310 func (xs *XdsServer) PassthroughPut(url string) {
311 if xs.apiRouter == nil {
312 xs.Log.Errorf("apiRouter not set !")
316 xs.apiRouter.PUT(url, func(c *gin.Context) {
321 body, err := c.GetRawData()
323 common.APIError(c, err.Error())
327 // Take care of param (eg. id in /projects/:id)
329 if strings.Contains(url, ":") {
330 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
334 response, err := xs.client.HTTPPutWithRes(nURL, string(body))
338 if response.StatusCode != 200 {
339 err = fmt.Errorf(response.Status)
342 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
347 c.JSON(http.StatusOK, data)
350 /* Handle error case */
352 if strings.Contains(err.Error(), "connection refused") {
355 common.APIError(c, err.Error())
359 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
360 func (xs *XdsServer) PassthroughDelete(url string) {
361 if xs.apiRouter == nil {
362 xs.Log.Errorf("apiRouter not set !")
366 xs.apiRouter.DELETE(url, func(c *gin.Context) {
370 // Take care of param (eg. id in /projects/:id)
372 if strings.Contains(url, ":") {
373 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
377 response, err := xs.client.HTTPDeleteWithRes(nURL)
381 if response.StatusCode != 200 {
382 err = fmt.Errorf(response.Status)
385 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
390 c.JSON(http.StatusOK, data)
393 /* Handle error case */
395 if strings.Contains(err.Error(), "connection refused") {
398 common.APIError(c, err.Error())
402 // EventRegister Post a request to register to an XdsServer event
403 func (xs *XdsServer) EventRegister(evName string, filter string) error {
404 return xs.client.Post("/events/register",
405 xsapiv1.EventRegisterArgs{
412 // EventEmit Emit a event to XDS Server through WS
413 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
414 if xs.ioSock == nil {
415 return fmt.Errorf("Io.Socket not initialized")
418 return xs.ioSock.Emit(message, args...)
421 // EventOn Register a callback on events reception
422 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
423 if xs.ioSock == nil {
424 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
427 xs.sockEventsLock.Lock()
428 defer xs.sockEventsLock.Unlock()
430 if _, exist := xs.sockEvents[evName]; !exist {
431 // Register listener only the first time
434 err := xs.ioSock.On(evn, func(data interface{}) error {
435 xs.sockEventsLock.Lock()
436 sEvts := make([]*caller, len(xs.sockEvents[evn]))
437 copy(sEvts, xs.sockEvents[evn])
438 xs.sockEventsLock.Unlock()
439 for _, c := range sEvts {
440 c.Func(c.PrivateData, data)
453 PrivateData: privData,
456 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
457 xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
461 // EventOff Un-register a (or all) callbacks associated to an event
462 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
463 xs.sockEventsLock.Lock()
464 defer xs.sockEventsLock.Unlock()
465 if _, exist := xs.sockEvents[evName]; exist {
468 xs.sockEvents[evName] = []*caller{}
470 // Un-register only the specified callback
471 for i, ff := range xs.sockEvents[evName] {
472 if uuid.Equal(ff.id, id) {
473 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
479 xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
483 // ProjectToFolder Convert Project structure to Folder structure
484 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
486 if pPrj.Type == xsapiv1.TypeCloudSync {
487 stID, _ = xs.SThg.IDGet()
489 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
490 fPrj := xsapiv1.FolderConfig{
493 ClientPath: pPrj.ClientPath,
494 Type: xsapiv1.FolderType(pPrj.Type),
496 IsInSync: pPrj.IsInSync,
497 DefaultSdk: pPrj.DefaultSdk,
498 ClientData: pPrj.ClientData,
499 DataPathMap: xsapiv1.PathMapConfig{
500 ServerPath: pPrj.ServerPath,
502 DataCloudSync: xsapiv1.CloudSyncConfig{
504 STLocIsInSync: pPrj.IsInSync,
505 STLocStatus: pPrj.Status,
506 STSvrIsInSync: pPrj.IsInSync,
507 STSvrStatus: pPrj.Status,
514 // FolderToProject Convert Folder structure to Project structure
515 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
516 inSync := fPrj.IsInSync
519 if fPrj.Type == xsapiv1.TypeCloudSync {
520 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
522 sts = fPrj.DataCloudSync.STSvrStatus
523 switch fPrj.DataCloudSync.STLocStatus {
524 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
525 sts = fPrj.DataCloudSync.STLocStatus
527 case xaapiv1.StatusSyncing:
528 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
529 sts = xaapiv1.StatusSyncing
532 case xaapiv1.StatusEnable:
538 pPrj := xaapiv1.ProjectConfig{
542 ClientPath: fPrj.ClientPath,
543 ServerPath: fPrj.DataPathMap.ServerPath,
544 Type: xaapiv1.ProjectType(fPrj.Type),
547 DefaultSdk: fPrj.DefaultSdk,
548 ClientData: fPrj.ClientData,
553 // CommandAdd Add a new command to the list of running commands
554 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
555 if xs.CommandGet(cmdID) != nil {
556 return fmt.Errorf("command id already exist")
558 xs.cmdList[cmdID] = data
562 // CommandDelete Delete a command from the command list
563 func (xs *XdsServer) CommandDelete(cmdID string) error {
564 if xs.CommandGet(cmdID) == nil {
565 return fmt.Errorf("unknown command id")
567 delete(xs.cmdList, cmdID)
571 // CommandGet Retrieve a command data
572 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
573 d, exist := xs.cmdList[cmdID]
584 // Create HTTP client
585 func (xs *XdsServer) _CreateConnectHTTP() error {
587 xs.client, err = common.HTTPNewClient(xs.BaseURL,
588 common.HTTPClientConfig{
589 URLPrefix: "/api/v1",
590 HeaderClientKeyName: "Xds-Sid",
593 LogPrefix: "XDSSERVER: ",
594 LogLevel: common.HTTPLogLevelWarning,
597 xs.client.SetLogLevel(xs.Log.Level.String())
600 msg := ": " + err.Error()
601 if strings.Contains(err.Error(), "connection refused") {
602 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
604 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
606 if xs.client == nil {
607 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
613 // _Reconnect Re-established connection
614 func (xs *XdsServer) _Reconnect() error {
616 // Note that ConnectOn callback will be called (see apiv1.go file)
617 err := xs._Connect(true)
622 // _Connect Established HTTP and WS connection and retrieve XDSServer config
623 func (xs *XdsServer) _Connect(reConn bool) error {
625 xdsCfg := xsapiv1.APIConfig{}
626 if err := xs.client.Get("/config", &xdsCfg); err != nil {
634 if reConn && xs.ID != xdsCfg.ServerUID {
635 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
638 // Update local XDS config
639 xs.ID = xdsCfg.ServerUID
640 xs.ServerConfig = &xdsCfg
642 // Establish WS connection and register listen
643 if err := xs._SocketConnect(); err != nil {
650 // Call OnConnect callback
651 if xs.cbOnConnect != nil {
659 // _SocketConnect Create WebSocket (io.socket) connection
660 func (xs *XdsServer) _SocketConnect() error {
662 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
664 opts := &sio_client.Options{
665 Transport: "websocket",
666 Header: make(map[string][]string),
668 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
670 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
672 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
676 // Register some listeners
678 iosk.On("error", func(err error) {
679 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
680 if xs.CBOnError != nil {
685 iosk.On("disconnection", func(err error) {
686 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
687 if xs.CBOnDisconnect != nil {
688 xs.CBOnDisconnect(err)
692 // Try to reconnect during 15min (or at least while not disabled)
696 for !xs.Disabled && !xs.Connected {
701 if waitTime > 15*60 {
702 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
705 time.Sleep(time.Second * time.Duration(waitTime))
706 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
708 err := xs._Reconnect()
710 !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
711 xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
718 // XXX - There is no connection event generated so, just consider that
719 // we are connected when NewClient return successfully
720 /* iosk.On("connection", func() { ... }) */
721 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
726 // _Disconnected Set XDS Server as disconnected
727 func (xs *XdsServer) _Disconnected() error {
728 // Clear all register events as socket is closed
729 xs.sockEventsLock.Lock()
730 defer xs.sockEventsLock.Unlock()
732 for k := range xs.sockEvents {
733 delete(xs.sockEvents, k)
741 // _NotifyState Send event to notify changes
742 func (xs *XdsServer) _NotifyState() {
744 evSts := xaapiv1.ServerCfg{
748 PartialURL: xs.PartialURL,
749 ConnRetry: xs.ConnRetry,
750 Connected: xs.Connected,
752 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
753 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)