2 * Copyright (C) 2017-2018 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xaapiv1"
30 "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent/lib/xdsconfig"
31 common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git/golib"
32 "gerrit.automotivelinux.org/gerrit/src/xds/xds-server.git/lib/xsapiv1"
33 "github.com/gin-gonic/gin"
34 uuid "github.com/satori/go.uuid"
35 sio_client "github.com/sebd71/go-socket.io-client"
39 type XdsServer struct {
49 ServerConfig *xsapiv1.APIConfig
53 CBOnDisconnect func(error)
54 sockEvents map[string][]*caller
55 sockEventsLock *sync.Mutex
58 client *common.HTTPClient
59 ioSock *sio_client.Client
61 apiRouter *gin.RouterGroup
62 cmdList map[string]interface{}
63 cbOnConnect OnConnectedCB
66 // EventCB Event emitter callback
67 type EventCB func(privData interface{}, evtData interface{}) error
69 // OnConnectedCB connect callback
70 type OnConnectedCB func(svr *XdsServer) error
72 // caller Used to chain event listeners
77 PrivateData interface{}
80 const _IDTempoPrefix = "tempo-"
82 // NewXdsServer creates an instance of XdsServer
83 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
86 ID: _IDTempoPrefix + uuid.NewV1().String(),
87 URLIndex: conf.URLIndex,
89 APIURL: conf.APIBaseURL + conf.APIPartialURL,
90 PartialURL: conf.APIPartialURL,
91 ConnRetry: conf.ConnRetry,
95 sockEvents: make(map[string][]*caller),
96 sockEventsLock: &sync.Mutex{},
98 cmdList: make(map[string]interface{}),
102 // Close Free and close XDS Server connection
103 func (xs *XdsServer) Close() error {
104 err := xs._Disconnected()
109 // Connect Establish HTTP connection with XDS Server
110 func (xs *XdsServer) Connect() error {
118 for retry = xs.ConnRetry; retry > 0; retry-- {
119 if err = xs._CreateConnectHTTP(); err == nil {
122 if retry == xs.ConnRetry {
123 // Notify only on the first conn error
124 // doing that avoid 2 notifs (conn false; conn true) on startup
127 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
128 time.Sleep(time.Second)
131 // FIXME: re-use _Reconnect to wait longer in background
132 return fmt.Errorf("Connection to XDS Server failure")
138 // Check HTTP connection and establish WS connection
139 err = xs._Connect(false)
144 // ConnectOn Register a callback on events reception
145 func (xs *XdsServer) ConnectOn(f OnConnectedCB) error {
150 // IsTempoID returns true when server as a temporary id
151 func (xs *XdsServer) IsTempoID() bool {
152 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
155 // SetLoggerOutput Set logger ou
156 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
160 // GetConfig return the current server config
161 func (xs *XdsServer) GetConfig() xaapiv1.ServerCfg {
162 return xaapiv1.ServerCfg{
166 PartialURL: xs.PartialURL,
167 ConnRetry: xs.ConnRetry,
168 Connected: xs.Connected,
169 Disabled: xs.Disabled,
173 // SendCommand Send a command to XDS Server
174 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
176 if !strings.HasPrefix("/", cmd) {
179 return xs.client.Post(url, string(body), res)
182 // GetVersion Send Get request to retrieve XDS Server version
183 func (xs *XdsServer) GetVersion(res interface{}) error {
184 return xs.client.Get("/version", &res)
187 // GetFolders Send GET request to get current folder configuration
188 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
189 return xs.client.Get("/folders", folders)
192 // FolderAdd Send POST request to add a folder
193 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
194 err := xs.client.Post("/folders", fld, res)
196 return fmt.Errorf("FolderAdd error: %s", err.Error())
201 // FolderDelete Send DELETE request to delete a folder
202 func (xs *XdsServer) FolderDelete(id string) error {
203 return xs.client.HTTPDelete("/folders/" + id)
206 // FolderSync Send POST request to force synchronization of a folder
207 func (xs *XdsServer) FolderSync(id string) error {
208 return xs.client.HTTPPost("/folders/sync/"+id, "")
211 // FolderUpdate Send PUT request to update a folder
212 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
213 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
216 // CommandExec Send POST request to execute a command
217 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
218 return xs.client.Post("/exec", args, res)
221 // CommandSignal Send POST request to send a signal to a command
222 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
223 return xs.client.Post("/signal", args, res)
226 // CommandTgtTerminalGet Send GET request to retrieve info of a target terminals
227 func (xs *XdsServer) CommandTgtTerminalGet(targetID, termID string, res *xsapiv1.TerminalConfig) error {
228 return xs.client.Get("/targets/"+targetID+"/terminals/"+termID, res)
231 // CommandTgtTerminalOpen Send POST request to open a target terminal
232 func (xs *XdsServer) CommandTgtTerminalOpen(targetID string, termID string, res *xsapiv1.TerminalConfig) error {
233 var empty interface{}
234 return xs.client.Post("/targets/"+targetID+"/terminals/"+termID+"/open", &empty, res)
237 // CommandTgtTerminalSignal Send POST request to send a signal to a target terminal
238 func (xs *XdsServer) CommandTgtTerminalSignal(args *xsapiv1.TerminalSignalArgs, res *xsapiv1.TerminalConfig) error {
239 return xs.client.Post("/signal", args, res)
242 // SetAPIRouterGroup .
243 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
247 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
248 func (xs *XdsServer) PassthroughGet(url string) {
249 if xs.apiRouter == nil {
250 xs.Log.Errorf("apiRouter not set !")
254 xs.apiRouter.GET(url, func(c *gin.Context) {
256 // Take care of param (eg. id in /projects/:id)
258 if strings.Contains(url, ":") {
259 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
262 if err := xs.client.Get(nURL, &data); err != nil {
263 if strings.Contains(err.Error(), "connection refused") {
266 common.APIError(c, err.Error())
270 c.JSON(http.StatusOK, data)
274 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
275 func (xs *XdsServer) PassthroughPost(url string) {
276 if xs.apiRouter == nil {
277 xs.Log.Errorf("apiRouter not set !")
281 xs.apiRouter.POST(url, func(c *gin.Context) {
286 body, err := c.GetRawData()
288 common.APIError(c, err.Error())
292 // Take care of param (eg. id in /projects/:id)
294 if strings.Contains(url, ":") {
295 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
299 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
303 if response.StatusCode != 200 {
304 err = fmt.Errorf(response.Status)
307 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
312 c.JSON(http.StatusOK, data)
315 /* Handle error case */
317 if strings.Contains(err.Error(), "connection refused") {
320 common.APIError(c, err.Error())
325 // PassthroughPut Used to declare a route that sends directly a PUT request to XDS Server
326 func (xs *XdsServer) PassthroughPut(url string) {
327 if xs.apiRouter == nil {
328 xs.Log.Errorf("apiRouter not set !")
332 xs.apiRouter.PUT(url, func(c *gin.Context) {
337 body, err := c.GetRawData()
339 common.APIError(c, err.Error())
343 // Take care of param (eg. id in /projects/:id)
345 if strings.Contains(url, ":") {
346 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
350 response, err := xs.client.HTTPPutWithRes(nURL, string(body))
354 if response.StatusCode != 200 {
355 err = fmt.Errorf(response.Status)
358 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
363 c.JSON(http.StatusOK, data)
366 /* Handle error case */
368 if strings.Contains(err.Error(), "connection refused") {
371 common.APIError(c, err.Error())
376 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
377 func (xs *XdsServer) PassthroughDelete(url string) {
378 if xs.apiRouter == nil {
379 xs.Log.Errorf("apiRouter not set !")
383 xs.apiRouter.DELETE(url, func(c *gin.Context) {
387 // Take care of param (eg. id in /projects/:id)
389 if strings.Contains(url, ":") {
390 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
394 response, err := xs.client.HTTPDeleteWithRes(nURL)
398 if response.StatusCode != 200 {
399 err = fmt.Errorf(response.Status)
402 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
407 c.JSON(http.StatusOK, data)
410 /* Handle error case */
412 if strings.Contains(err.Error(), "connection refused") {
415 common.APIError(c, err.Error())
420 // EventRegister Post a request to register to an XdsServer event
421 func (xs *XdsServer) EventRegister(evName string, filter string) error {
422 return xs.client.Post("/events/register",
423 xsapiv1.EventRegisterArgs{
430 // EventEmit Emit a event to XDS Server through WS
431 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
432 if xs.ioSock == nil {
433 return fmt.Errorf("Io.Socket not initialized")
436 return xs.ioSock.Emit(message, args...)
439 // EventOn Register a callback on events reception
440 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
441 if xs.ioSock == nil {
442 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
445 xs.sockEventsLock.Lock()
446 defer xs.sockEventsLock.Unlock()
448 if _, exist := xs.sockEvents[evName]; !exist {
449 // Register listener only the first time
452 err := xs.ioSock.On(evn, func(data interface{}) error {
453 xs.sockEventsLock.Lock()
454 sEvts := make([]*caller, len(xs.sockEvents[evn]))
455 copy(sEvts, xs.sockEvents[evn])
456 xs.sockEventsLock.Unlock()
457 for _, c := range sEvts {
458 c.Func(c.PrivateData, data)
471 PrivateData: privData,
474 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
475 xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
479 // EventOff Un-register a (or all) callbacks associated to an event
480 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
481 xs.sockEventsLock.Lock()
482 defer xs.sockEventsLock.Unlock()
483 if _, exist := xs.sockEvents[evName]; exist {
486 xs.sockEvents[evName] = []*caller{}
488 // Un-register only the specified callback
489 for i, ff := range xs.sockEvents[evName] {
490 if uuid.Equal(ff.id, id) {
491 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
497 xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
501 // ProjectToFolder Convert Project structure to Folder structure
502 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
504 if pPrj.Type == xsapiv1.TypeCloudSync {
505 stID, _ = xs.SThg.IDGet()
507 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
508 fPrj := xsapiv1.FolderConfig{
511 ClientPath: pPrj.ClientPath,
512 Type: xsapiv1.FolderType(pPrj.Type),
514 IsInSync: pPrj.IsInSync,
515 DefaultSdk: pPrj.DefaultSdk,
516 ClientData: pPrj.ClientData,
517 DataPathMap: xsapiv1.PathMapConfig{
518 ServerPath: pPrj.ServerPath,
520 DataCloudSync: xsapiv1.CloudSyncConfig{
522 STLocIsInSync: pPrj.IsInSync,
523 STLocStatus: pPrj.Status,
524 STSvrIsInSync: pPrj.IsInSync,
525 STSvrStatus: pPrj.Status,
532 // FolderToProject Convert Folder structure to Project structure
533 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
534 inSync := fPrj.IsInSync
537 if fPrj.Type == xsapiv1.TypeCloudSync {
538 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
540 sts = fPrj.DataCloudSync.STSvrStatus
541 switch fPrj.DataCloudSync.STLocStatus {
542 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
543 sts = fPrj.DataCloudSync.STLocStatus
545 case xaapiv1.StatusSyncing:
546 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
547 sts = xaapiv1.StatusSyncing
550 case xaapiv1.StatusEnable:
556 pPrj := xaapiv1.ProjectConfig{
560 ClientPath: fPrj.ClientPath,
561 ServerPath: fPrj.DataPathMap.ServerPath,
562 Type: xaapiv1.ProjectType(fPrj.Type),
565 DefaultSdk: fPrj.DefaultSdk,
566 ClientData: fPrj.ClientData,
571 // CommandAdd Add a new command to the list of running commands
572 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
573 if xs.CommandGet(cmdID) != nil {
574 return fmt.Errorf("command id already exist")
576 xs.cmdList[cmdID] = data
580 // CommandDelete Delete a command from the command list
581 func (xs *XdsServer) CommandDelete(cmdID string) error {
582 if xs.CommandGet(cmdID) == nil {
583 return fmt.Errorf("unknown command id")
585 delete(xs.cmdList, cmdID)
589 // CommandGet Retrieve a command data
590 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
591 d, exist := xs.cmdList[cmdID]
602 // Create HTTP client
603 func (xs *XdsServer) _CreateConnectHTTP() error {
605 xs.client, err = common.HTTPNewClient(xs.BaseURL,
606 common.HTTPClientConfig{
607 URLPrefix: "/api/v1",
608 HeaderClientKeyName: "Xds-Sid",
611 LogPrefix: "XDSSERVER: ",
612 LogLevel: common.HTTPLogLevelWarning,
615 xs.client.SetLogLevel(xs.Log.Level.String())
618 msg := ": " + err.Error()
619 if strings.Contains(err.Error(), "connection refused") {
620 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
622 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
624 if xs.client == nil {
625 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
631 // _Reconnect Re-established connection
632 func (xs *XdsServer) _Reconnect() error {
634 // Note that ConnectOn callback will be called (see apiv1.go file)
635 err := xs._Connect(true)
640 // _Connect Established HTTP and WS connection and retrieve XDSServer config
641 func (xs *XdsServer) _Connect(reConn bool) error {
643 xdsCfg := xsapiv1.APIConfig{}
644 if err := xs.client.Get("/config", &xdsCfg); err != nil {
652 if reConn && xs.ID != xdsCfg.ServerUID {
653 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
656 // Update local XDS config
657 xs.ID = xdsCfg.ServerUID
658 xs.ServerConfig = &xdsCfg
660 // Establish WS connection and register listen
661 if err := xs._SocketConnect(); err != nil {
668 // Call OnConnect callback
669 if xs.cbOnConnect != nil {
677 // _SocketConnect Create WebSocket (io.socket) connection
678 func (xs *XdsServer) _SocketConnect() error {
680 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
682 opts := &sio_client.Options{
683 Transport: "websocket",
684 Header: make(map[string][]string),
686 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
688 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
690 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
694 // Register some listeners
696 iosk.On("error", func(err error) {
697 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
698 if xs.CBOnError != nil {
703 iosk.On("disconnection", func(err error) {
704 xs.Log.Infof("IO.socket disconnection server %s (APIURL %s)", xs.ID, xs.APIURL)
705 if xs.CBOnDisconnect != nil {
706 xs.CBOnDisconnect(err)
710 // Try to reconnect during 15min (or at least while not disabled)
714 for !xs.Disabled && !xs.Connected {
719 if waitTime > 15*60 {
720 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
723 time.Sleep(time.Second * time.Duration(waitTime))
724 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
726 err := xs._Reconnect()
728 !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
729 xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
736 // XXX - There is no connection event generated so, just consider that
737 // we are connected when NewClient return successfully
738 /* iosk.On("connection", func() { ... }) */
739 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
744 // _Disconnected Set XDS Server as disconnected
745 func (xs *XdsServer) _Disconnected() error {
746 // Clear all register events as socket is closed
747 xs.sockEventsLock.Lock()
748 defer xs.sockEventsLock.Unlock()
750 for k := range xs.sockEvents {
751 delete(xs.sockEvents, k)
759 // _NotifyState Send event to notify changes
760 func (xs *XdsServer) _NotifyState() {
762 evSts := xaapiv1.ServerCfg{
766 PartialURL: xs.PartialURL,
767 ConnRetry: xs.ConnRetry,
768 Connected: xs.Connected,
770 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
771 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)